Duchen's Blog

RxJava基础使用及原理分析

RxJava基础使用及原理分析

基本使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Observable.create(new OnSubscribe<Drawable>() {
@Override
public void call(Subscriber<? super Drawable> subscriber) {
Drawable drawable = getTheme().getDrawable(drawableRes));
subscriber.onNext(drawable);
subscriber.onCompleted();
}
}).subscribe(new Observer<Drawable>() {
@Override
public void onNext(Drawable drawable) {
imageView.setImageDrawable(drawable);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();
}
});

Observable

变换的原理:lift()

先是一个Operator的概念:

Operator

1
2
3
4
5
6
7
8
9
10
11
12
// 注意:这不是 lift() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。
// 如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。
public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {
return Observable.create(new OnSubscribe<R>() {
@Override
public void call(Subscriber subscriber) {
Subscriber newSubscriber = operator.call(subscriber);
newSubscriber.onStart();
onSubscribe.call(newSubscriber);
}
});
}

链式调用

当含有 lift() 时:

  1. lift() 创建了一个 Observable 后,加上之前的原始 Observable,已经有两个 Observable 了;
  2. 而同样地,新 Observable 里的新 OnSubscribe 加上之前的原始 Observable 中的原始 OnSubscribe,也就有了两个 OnSubscribe;
  3. 当用户调用经过 lift() 后的 Observable 的 subscribe() 的时候,使用的是 lift() 所返回的新的 Observable ,于是它所触发的 onSubscribe.call(subscriber),也是用的新 Observable 中的新 OnSubscribe,即在 lift() 中生成的那个 OnSubscribe;
  4. 而这个新 OnSubscribe 的 call() 方法中的 onSubscribe ,就是指的原始 Observable 中的原始 OnSubscribe ,在这个 call() 方法里,新 OnSubscribe 利用 operator.call(subscriber) 生成了一个新的 Subscriber(Operator 就是在这里,通过自己的 call() 方法将新 Subscriber 和原始 Subscriber 进行关联,并插入自己的『变换』代码以实现变换),然后利用这个新 Subscriber 向原始 Observable 进行订阅。

这样就实现了 lift() 过程,有点像一种代理机制,通过事件拦截和处理实现事件序列的变换。

Scheduler 的原理

  • subscribeOn:在OnSubscriberB的call方法切换线程并构建newSubscriber
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// subscriberOn的核心代码
public final Observable<T> subscribeOn(Scheduler scheduler) {
return Observable.create(new Observable.OnSubscribe<T>() {
@Override
public void call(Subscriber<? super T> subscriber) {
final Scheduler.Worker inner = scheduler.createWorker();
inner.schedule(new Action0() {
@Override
public void call() {
Subscriber<T> newSubscriber = new Subscriber<T>(subscriber) {
@Override
public void onNext(T t) {
subscriber.onNext(t);
}
@Override
public void onError(Throwable e) {
subscriber.onError(e);
}
@Override
public void onCompleted() {
subscriber.onCompleted();
}
};
newSubscriber.onStart();
onSubscribe.call(newSubscriber);
}
});
}
});
}
  • observeOn:比较复杂,不贴代码了,过程就是一个lift操作,提供了一个中间的ObserveOnSubscriber,每当ObserveOnSubscriber收到一个onNext或其他消息时,就调用Worker在相应的线程中处理这条消息

  • 当多个 subscribeOn() 和 observeOn() 混合使用时

    图中共有 5 处含有对事件的操作。由图中可以看出,①和②两处受第一个 subscribeOn() 影响,运行在红色线程;③和④处受第一个 observeOn() 的影响,运行在绿色线程;⑤处受第二个 onserveOn() 影响,运行在紫色线程;而第二个 subscribeOn() ,由于在通知过程中线程就被第一个 subscribeOn() 截断,因此对整个流程并没有任何影响。这里也就回答了前面的问题:当使用了多个 subscribeOn() 的时候,只有第一个 subscribeOn() 起作用。

参考链接

给 Android 开发者的 RxJava 详解