RxJava observeOn()与subscribeOn()的关系
Posted 一叶飘舟
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RxJava observeOn()与subscribeOn()的关系相关的知识,希望对你有一定的参考价值。
RxJava系列教程:
1. RxJava使用介绍 【视频教程】
2. RxJava操作符
• Creating Observables(Observable的创建操作符) 【视频教程】
• Transforming Observables(Observable的转换操作符) 【视频教程】
• Filtering Observables(Observable的过滤操作符) 【视频教程】
• Combining Observables(Observable的组合操作符) 【视频教程】
• Error Handling Operators(Observable的错误处理操作符) 【视频教程】
• Observable Utility Operators(Observable的辅助性操作符) 【视频教程】
• Conditional and Boolean Operators(Observable的条件和布尔操作符) 【视频教程】
• Mathematical and Aggregate Operators(Observable数学运算及聚合操作符) 【视频教程】
• 其他如observable.toList()、observable.connect()、observable.publish()等等; 【视频教程】
3. RxJava Observer与Subcriber的关系 【视频教程】
4. RxJava线程控制(Scheduler) 【视频教程】
5. RxJava 并发之数据流发射太快如何办(背压(Backpressure)) 【视频教程】
observeOn和subscribeOn都是对observable的一种操作,区别就是subscribeOn改变了observable本身产生事件的schedule以及发出事件后相关处理事件的程序所在的scheduler,而obseveron仅仅是改变了对发出事件后相关处理事件的程序所在的scheduler。
或许你会问,这有多大的区别吗?的确是有的,比如说产生observable事件是一件费时可能会卡主线程的操作(比如说获取网络数据),那么subscribeOn就是你的选择,这样可以避免卡住主线程。
两者最主要的差别是影响的范围不同,observeOn is more limited,但是却是可以多次调用,多次改变不同的接受者所在的scheduler,在调用这个函数之后的observable造成影响。而subscribeOn则是一次性的,无论在什么地方调用,总是从改变最原始的observable开始影响整个observable的处理。
subscribeOn()和observeOn()的区别
- subscribeOn()主要改变的是订阅的线程,即call()执行的线程;
- observeOn()主要改变的是发送的线程,即onNext()执行的线程。
subscribeOn
我们先看一个例子。
Observable
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("a");
subscriber.onNext("b");
subscriber.onCompleted();
}
})
.subscribeOn(Schedulers.io())
.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String integer) {
System.out.println(integer);
}
});
运行如下:
a
b
我们看一下subscribeOn()中,都干了什么
public final Observable<T> subscribeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return create(new OperatorSubscribeOn<T>(this, scheduler));
}
很明显,会走if之外的方法。
在这里我们可以看到,我们又创建了一个Observable对象,但创建时传入的参数为OperatorSubscribeOn(this,scheduler),我们看一下此对象以及其对应的构造方法
OperatorSubscribeOn代码:
public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {
final Scheduler scheduler;
final Observable<T> source;
public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
this.scheduler = scheduler;
this.source = source;
}
@Override
public void call(final Subscriber<? super T> subscriber) {
final Worker inner = scheduler.createWorker();
subscriber.add(inner);
inner.schedule(new Action0() {
@Override
public void call() {
final Thread t = Thread.currentThread();
Subscriber<T> s = new Subscriber<T>(subscriber) {
@Override
public void onNext(T t) {
subscriber.onNext(t);
}
@Override
public void onError(Throwable e) {
try {
subscriber.onError(e);
} finally {
inner.unsubscribe();
}
}
@Override
public void onCompleted() {
try {
subscriber.onCompleted();
} finally {
inner.unsubscribe();
}
}
@Override
public void setProducer(final Producer p) {
subscriber.setProducer(new Producer() {
@Override
public void request(final long n) {
if (t == Thread.currentThread()) {
p.request(n);
} else {
inner.schedule(new Action0() {
@Override
public void call() {
p.request(n);
}
});
}
}
});
}
};
source.unsafeSubscribe(s);
}
});
}
}
可以看到,OperatorSubscribeOn实现Onsubscribe,并且由其构造方法可知,他保存了上一个Observable对象,并保存了Scheduler对象。
这里做个总结。
把Observable.create()创建的称之为Observable_1,OnSubscribe_1。把subscribeOn()创建的称之为Observable_2,OnSubscribe_2(= OperatorSubscribeOn)。
那么,前两步就是创建了两个的observable,和OnSubscribe,并且OnSubscribe_2中保存了Observable_1的应用,即source。
调用Observable_2.subscribe()方法会调用OnSubscibe_2的call方法,即OperatorSubscribeOn的call()。
下面分析下call()方法。
- inner.schedule()改变了线程,此时Action的call()在我们指定的线程中运行。
- Subscriber被包装了一层。
- source.unsafeSubscribe(s);,注意source是Observable_1对象。
unsafeSubscribe方法代码:
public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
try {
// new Subscriber so onStart it
subscriber.onStart();
// allow the hook to intercept and/or decorate
hook.onSubscribeStart(this, onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Exceptions.throwIfFatal(e);
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
subscriber.onError(hook.onSubscribeError(e));
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
// TODO could the hook be the cause of the error in the on error handling.
hook.onSubscribeError(r);
// TODO why aren't we throwing the hook's return value.
throw r;
}
return Subscriptions.unsubscribed();
}
}
代码很多,关键代码:
hook.onSubscribeStart(this, onSubscribe).call(subscriber);
该方法即调用了OnSubscribe_1.call()方法。注意,此时的call()方法在我们指定的线程中运行。那么就起到了改变线程的作用。
对于以上线程,我们可以总结,其有如下流程:
- Observable.create() : 创建了Observable_1和OnSubscribe_1;
- subscribeOn(): 创建Observable_2和OperatorSubscribeOn(OnSubscribe_2),同时OperatorSubscribeOn保存了Observable_1的引用。
- observable_2.subscribe(Observer):
- 调用OperatorSubscribeOn的call()。call()改变了线程的运行,并且调用了Observable_1.unsafeSubscribe(s);
- Observable_1.unsafeSubscribe(s);,该方法的实现中调用了OnSubscribe_1的call()。
从这个可以了解,无论我们的subscribeOn()放在哪里,他改变的是subscribe()的过程,而不是onNext()的过程。
那么如果有多个subscribeOn(),那么线程会怎样执行呢。如果按照我们的逻辑,有以下程序
Observable.just("ss")
.subscribeOn(Schedulers.io()) // ----1---
.subscribeOn(Schedulers.newThread()) //----2----
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
}
});
那么,我们根据之前的源码分析其执行逻辑。
- Observable.just(“ss”),创建Observable_1,OnSubscribe_1
- Observable_1.subscribeOn(Schedulers.io()):创建observable_2,OperatorSubscribeOn_2并保存Observable_1的引用。
- observable_2.subscribeOn(Schedulers.newThread()):创建Observable_3,OperatorSubscribeOn_3并保存Observable_2的引用。
- Observable_3.subscribe():
- 调用OperatorSubscribeOn_3.call(),改变线程为Schedulers.newThread()。
- 调用OperatorSubscribeOn_2.call(),改变线程为Schedulers.io()。
- 调用OnSubscribe_1.call(),此时call()运行在Schedulers.io()。
根据以上逻辑分析,会按照1的线程进行执行。
subscribeOn如何工作,关键代码其实就是一行代码:
source.unsafeSubscribe(s);
注意它所在的位置,是在worker的call里面,说白了,就是把source.subscribe这一行调用放在指定的线程里,那么总结起来的结论就是:
subscribeOn的调用,改变了调用前序列所运行的线程。
observeOn
看一下observeOn()源码:
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, RxRingBuffer.SIZE);
}
public final Observable<T> observeOn(Scheduler scheduler, int bufferSize) {
return observeOn(scheduler, false, bufferSize);
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
}
这里引出了新的操作符lift
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator));
}
这里不再介绍了,详见:http://blog.csdn.net/jdsjlzx/article/details/51686152
在lift()中,有如下关键代码:
Subscriber<? super T> st = hook.onLift(operator).call(o);
OperatorObserveOn.call()核心代码:
@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
if (scheduler instanceof ImmediateScheduler) {
// avoid overhead, execute directly
return child;
} else if (scheduler instanceof TrampolineScheduler) {
// avoid overhead, execute directly
return child;
} else {
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
parent.init();
return parent;
}
}
我们看到其返回了ObserveOnSubscriber< T>,注意:此时只调用了call()方法,但call()方法中并没有改变线程的操作,此时为subscribe()过程。
我们直奔重点,因为,我们了解到其改变的是onNext()过程,那么我们肯定要看一下ObserveOnSubscriber.onNext()找找在哪改变线程
@Override
public void onNext(final T t) {
if (isUnsubscribed() || finished) {
return;
}
if (!queue.offer(on.next(t))) {
onError(new MissingBackpressureException());
return;
}
schedule();
}
这里做了两件事,首先把结果缓存到一个队列里,然后调用schedule启动传入的worker
我们这里需要注意下:
在调用observeOn前的序列,把结果传入到onNext就是它的工作,它并不关心后续的流程,所以工作就到这里就结束了,剩下的交给ObserveOnSubscriber继续。
onNext方法最后调用了schedule(),从方法名可以看到,其肯定是改变线程用的,并且该方法经过一番循环之后,调用了该类的call()方法。
protected void schedule() {
if (counter.getAndIncrement() == 0) {
recursiveScheduler.schedule(this);
}
}
recursiveScheduler 就是之前我们传入的Scheduler,我们一般会在observeOn传入androidScheluders.mainThread()。
scheduler中调用的call()方法
// only execute this from schedule()
@Override
public void call() {
long missed = 1L;
long currentEmission = emitted;
// these are accessed in a tight loop around atomics so
// loading them into local variables avoids the mandatory re-reading
// of the constant fields
final Queue<Object> q = this.queue;
final Subscriber<? super T> localChild = this.child;
final NotificationLite<T> localOn = this.on;
// requested and counter are not included to avoid JIT issues with register spilling
// and their access is is amortized because they are part of the outer loop which runs
// less frequently (usually after each bufferSize elements)
for (;;) {
long requestAmount = requested.get();
while (requestAmount != currentEmission) {
boolean done = finished;
Object v = q.poll();
boolean empty = v == null;
if (checkTerminated(done, empty, localChild, q)) {
return;
}
if (empty) {
break;
}
localChild.onNext(localOn.getValue(v));
currentEmission++;
if (currentEmission == limit) {
requestAmount = BackpressureUtils.produced(requested, currentEmission);
request(currentEmission);
currentEmission = 0L;
}
}
if (requestAmount == currentEmission) {
if (checkTerminated(finished, q.isEmpty(), localChild, q)) {
return;
}
}
emitted = currentEmission;
missed = counter.addAndGet(-missed);
if (missed == 0L) {
break;
}
}
}
call()中有localChild.onNext(localOn.getValue(v));调用。
在Scheduler启动后, 我们在Observable.subscribe(a)传入的a就是这里的child, 我们看到,在call中终于调用了它的onNext方法,把真正的结果传了出去,但是在这里,我们是工作在observeOn的线程上的。
总结起来的结论就是:
- observeOn 对调用之前的序列默不关心,也不会要求之前的序列运行在指定的线程上
- observeOn 对之前的序列产生的结果先缓存起来,然后再在指定的线程上,推送给最终的subscriber
observeOn改变的是onNext()调用。
subcribeOn和observeOn 对比分析
Observable
.map // 操作1
.flatMap // 操作2
.subscribeOn(io)
.map //操作3
.flatMap //操作4
.observeOn(main)
.map //操作5
.flatMap //操作6
.subscribeOn(io) //!!特别注意
.subscribe(handleData)
有如上逻辑,则我们对其运行进行分析。
首先,我们需要先明白其内部执行的逻辑。
在调用subscribe之后,逻辑开始运行。分别调用每一步OnSubscribe.call(),注意:自下往上。当运行到最上,即Observable.create()后,我们在其中调用了subscriber.onNext(),于是程序开始自上往下执行每一个对象的subscriber.onNext()方法。最终,直到subscribe()中的回调。
其次,从上面对subscribeOn()和observeOn()的分析中可以明白,subscribeOn()是在call()方法中起作用,而observeOn()是在onNext()中作用。
那么对于以上的逻辑,我们可以得出如下结论:
- 操作1,2,3,4在io线程中,因为在如果没有observeOn()影响,他们的回调操作默认在订阅的线程中。而我们的订阅线程在subscribeOn(io)发生了改变。注意他们执行的先后顺序。
- 操作5,6在main线程中运行。因为observeOn()改变了onNext().
- 特别注意那一个逻辑没起到作用
再简单点总结就是
- subscribeOn的调用切换之前的线程。
- observeOn的调用切换之后的线程。
- observeOn之后,不可再调用subscribeOn 切换线程
复杂情况
我们经常多次使用subscribeOn切换线程,那么以后是否可以组合observeOn和subscribeOn达到自由切换的目的呢?
组合是可以的,但是他们的执行顺序是有条件的,如果仔细分析的话,可以知道observeOn调用之后,再调用subscribeOn是无效的,原因是什么?
因为subscribeOn改变的是subscribe这句调用所在的线程,大多数情况,产生内容和消费内容是在同一线程的,所以改变了产生内容所在的线程,就改变了消费内容所在的线程。
经过上面的阐述,我们知道,observeOn的工作原理是把消费结果先缓存,再切换到新线程上让原始消费者消费,它和生产者是没有一点关系的,就算subscribeOn调用了,也只是改变observeOn这个消费者所在的线程,和OperatorObserveOn中存储的原始消费者一点关系都没有,它还是由observeOn控制。
@扔物线 大神给的总结:
- 下面提到的“操作”包括产生事件、用操作符操作事件以及最终的通过 subscriber 消费事件;
- 只有第一subscribeOn() 起作用(所以多个 subscribeOn() 无意义);
- 这个 subscribeOn() 控制从流程开始的第一个操作,直到遇到第一个 observeOn();
- observeOn() 可以使用多次,每个 observeOn() 将导致一次线程切换(),这次切换开始于这次 observeOn() 的下一个操作;
- 不论是 subscribeOn() 还是 observeOn(),每次线程切换如果不受到下一个 observeOn() 的干预,线程将不再改变,不会自动切换到其他线程。
参考文章:
https://segmentfault.com/a/1190000004856071
http://blog.csdn.net/lisdye2/article/details/51113837
最后需要注意的是:
默认情况下, doOnSubscribe() 执行在 subscribe() 发生的线程;而如果在 doOnSubscribe() 之后有subscribeOn() 的话,它将执行在离它最近的 subscribeOn() 所指定的线程。
想要了解更多请参考博客:RxJava中的doOnSubscribe默认执行线程分析
这个(学习总结)博客花了3个小时多,看源码真的很头疼,希望以后能有所提高。
以上是关于RxJava observeOn()与subscribeOn()的关系的主要内容,如果未能解决你的问题,请参考以下文章
RxJava observeOn()与subscribeOn()的关系
尽管正确调用了observeOn(io线程),但RxJava NetworkOnMainThreadException
RxJava之七——RxJava 2.0 图文分析create() subscribe()map()observeOn()subscribeOn()源码