一张图搞定-RxJava2的线程切换原理和内存泄露问题
Posted ccx-_-
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了一张图搞定-RxJava2的线程切换原理和内存泄露问题相关的知识,希望对你有一定的参考价值。
一张图搞定-RxJava2的线程切换原理和内存泄露问题分析
首先祭出我自己画的一张图
这张图显示的是RxJava2源码层面上的调用关系
下面通过一个例子来解释一下这张图
public class MainActivity extends Activity
private static final String TAG = "MainActivity";
CompositeDisposable comDisposable = new CompositeDisposable();
Bitmap bitmap;
@Override
protected void onCreate(Bundle savedInstanceState)
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>()
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception
emitter.onNext("hello");
)
.subscribeOn(Schedulers.io())
.observeOn(androidSchedulers.mainThread());
Disposable disposable = observable.subscribe(new Consumer<String>()
@Override
public void accept(@NonNull String s) throws Exception
Log.i(TAG,s);
);
comDisposable.add(disposable);
bitmap = BitmapFactory.decodeResource(getResources(),R.mipmap.aaa);
@Override
protected void onDestroy()
super.onDestroy();
//comDisposable.dispose();
这个例子很简单,在io线程里发送一个字符串,然后在主线程中打印出来.这里面我们加了一个bitmap对象,这个对象里面保持了一个大图片,我们通过观察内存占用,
就可以分析此Activity是否内存泄露,没有被垃圾回收.
首先从Observable.create方法开始分析
public static <T> Observable<T> create(ObservableOnSubscribe<T> source)
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
生成了一个ObservableCreate对象,此对象中传入了一个ObservableOnSubscribe对象,而这个ObservableOnSubscribe对象是我们实现的一个内部类
new ObservableOnSubscribe<String>()
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception
emitter.onNext("hello");
这个ObservableOnSubscribe的构造方法定义为
public ObservableCreate(ObservableOnSubscribe<T> source)
this.source = source;
将ObservableOnSubscribe对象赋值给了source,在图中可以看到ObservableCreate里面的source的颜色和ObservableOnSubscribe的颜色相同都是黄色,这个
图中,我特意将相同的对象置为了相同的颜色,从ObservableCreate --> ObservableSubscribeOn --> ObservableObserveOn,从上到下,依次持有上层的对象,用的是装饰者模式,像极了Java的流对象.
ObservableSubscribeOn是执行subscribeOn方法生成的,代码如下
public final Observable<T> subscribeOn(Scheduler scheduler)
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
ObservableObserveOn是执行observeOn方法生成的,代码如下
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize)
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
也就是下面这段代码最终得到了一个ObservableObserveOn对象
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>()
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception
emitter.onNext("hello");
)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
OK,这就是图的左半边的对象生成逻辑,也许你会问每一个对象里的source是干什么用的?,别着急接下来你就会明白了.
来看subscribe操作,代码如下
Disposable disposable = observable.subscribe(new Consumer<String>()
@Override
public void accept(@NonNull String s) throws Exception
Log.i(TAG,s);
);
这个subscribe方法中我们传了一个onNext(consumer对象)操作,进入subscribe源码里
public final Disposable subscribe(Consumer<? super T> onNext)
return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
其实这个subscribe里是可以传四个参数的分别是onNext,onError,onComplete,onSubscribe,我们这里只传了一个onNext,其他操作源码传入了默认操作.
接着向里面跟源码:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Disposable> onSubscribe)
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
subscribe(ls);
return ls;
可以看到new了一个LambdaObserver对象,并传入了 onNext, onError, onComplete, onSubscribe.之后调用subscribe方法,
此subscribe方法是ObservableObserveOn对象的,最终会调用ObservableObserveOn的subscribeActual方法,代码如下:
@Override
protected void subscribeActual(Observer<? super T> observer)
if (scheduler instanceof TrampolineScheduler)
source.subscribe(observer);
else
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
让我们结合图来分析一下,这个方法调用的是source.subscribe并传入一个ObserveOnObserver对象
从图中我们可以看到ObservableObserveOn对象里的source是绿色的,是一个ObservableSubscribeOn对象,调用其subscribe方法并传入一个ObserveOnObserver对象(图中浅绿色),这个ObserveOnOnserver对象中有一个actual对象,此对象为LambdaObserver(图中紫色),从上面的分析中我们知道LambdaObserver对象中保存着我们的定义的onNext操作.
注意,接下来比较重要,要出现线程切换了:
因为调用了ObservableSubscribeOn的subscribe方法,最终会调用subscribeActual方法,代码如下
@Override
public void subscribeActual(final Observer<? super T> s)
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
我们在知道ObservableSubscribeOn对象时调用subscribeOn(Schedulers.io())方法是生成的对象,上面的scheduler对象就是Schedulers.io()生成的一个线程调度对象,此对象是维护这一个线程池,让操作在io线程池中执行(关于io线程池,大家可以自己百度:) ),也就是此方法 scheduler.scheduleDirect(new SubscribeTask(parent))会切换线程来执行SubscribeTask任务,此任务的定义为:
final class SubscribeTask implements Runnable
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent)
this.parent = parent;
@Override
public void run()
source.subscribe(parent);
其中parent就是一个SubscribeOnObserver对象,soure就是一个ObservableCreate对象,如图:
SubscribeOnObserver对象中也有一个actual对象(浅绿色),从图中可看到是从下面传上来的一个ObserveOnObserver对象.
执行source.subscribe(parent) (也就是ObservableCreate.subscribe(ObserveOnObserver))时,是在io线程中进行的,如图所示切换了线程.这里其实可以解释一个问题 subscribeOn如果执行多次为什么只有第一次有作用?,每一次subscribeOn都会切换线程,从图中我们可以看到,这个切换线程是倒着的从下往上,也就是一直切换到最后一个subscribeOn,其实就是我们代码中定义的第一个subscribeOn,也许你会问,从下往上是subscribeOn切换线程,那如果要调用我们自己定义的onNext,最终不是还要再从上往下调用回去,调用回去时还会切换线程么?别着急,一会就会讲到从上往下调用回去的逻辑,这里我可以先说一个概括 调用subscribe时的从下往上是subscribeOn切换线程,之后调用onNext传递数据时的从上往下是ObserveOn切换线程
接着往下,在io线程里调用ObservableCreate.subscribe(ObserveOnObserver),最终同样调用到subscribeActual方法,代码如下
@Override
protected void subscribeActual(Observer<? super T> observer)
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try
source.subscribe(parent);
catch (Throwable ex)
Exceptions.throwIfFatal(ex);
parent.onError(ex);
同样的调用了source.subscribe方法,这里的source是ObservableOnSubscribe对象,parent是一个CreateEmitter对象,如下图:
从图中可以看到CreateEmitter中有一个observer对象(深蓝色),此对象是传上来的SubscribeOnObserver对象,Ok终于调到头了,最终调用ObservableOnSubscribe的subscribe方法,这个ObservableOnSubscribe就是我们在Activity中自己实现的一个匿名内部类:
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>()
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception
emitter.onNext("hello");
)
豁然开朗有木有,终于到了数据产生的源头了,并且这个方法是在io线程里执行的,准确的说是第一个subscribeOn方法所指定的线程,这里我们只分析onNext方法,其他onError,onComplete分析方法是相同的.
接着来分析一下onNext方法的传递.首先从我们的调用开始emitter.onNext(“hello”);,这里的emitter其实就是上面我们创建的CreateEmitter对象,调用其onNext()方法,代码如下:
@Override
public void onNext(T t)
if (t == null)
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
if (!isDisposed())
observer.onNext(t);
调用了observer的onNext方法,从我们的流程图中我们可以看到,observer其实是一个SubscribeOnObserver,也就是调用SubscribeOnObserver的onNext方法,代码如下:
@Override
public void onNext(T t)
actual.onNext(t);
调用actual的onNext方法,从流程图中我们可以看到,这个acutal是一个ObserveObObserver对象,接着来看ObserveObObserver的onNext方法:
@Override
public void onNext(T t)
if (done)
return;
if (sourceMode != QueueDisposable.ASYNC)
queue.offer(t);
schedule();
关键的来了,调用了schedule()方法,从名字看是要切换线程了,
void schedule()
if (getAndIncrement() == 0)
worker.schedule(this);
void drainNormal()
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;
for (;;)
boolean d = done;
T v;
try
v = q.poll();
catch (Throwable ex)
....
a.onNext(v);
......
schedule()方法调用worker.schedule(this);此方法其实就是在将线程切换到主线程(也就是我们ObserveOn(AndroidSchedulers.mainThread())所指定的线程),执行drainNormal方法,
drainNormal()方法调用a.onNext(v),a 在
final Observer<? super T> a = actual;
中被赋值为actual,这个actual是不是很熟悉,没错在流程图中我们可以看到actual是一个LambdaObserver
结束了,还记得么LambdaObserver的onNext方法是我们定义的,终于完成了这么一条调用链,看着是挺复杂的,但结合着图理解,就很简单了:)
总结RxJava2的切换原理是 调用subscribe时的从下往上是subscribeOn切换线程,之后调用onNext传递数据时的从上往下是ObserveOn切换线程.
接下来我们来结合着流程图来看看可能存在内存泄露的地方,我们都知道Java中非静态内部类持有外部类的引用,这是导致内存泄露的常见形式:内部类因为某些原因不能被释放,导致它所持有的外部类也得不到释放.来看看我们demo中定义的RxJava代码:
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>()
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception
emitter.onNext("hello");
)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
Disposable disposable = observable.subscribe(new Consumer<String>()
@Override
public void accept(@NonNull String s) throws Exception
Log.i(TAG,s);
);
我们定义了两个内部类:ObservableOnSubscribe和Consumer,我们想象一种情况,如果io线程里执行的ObservableOnSubscribe的onNext方法阻塞住了一直没有返回,导致此对象不能被回收,如下面代码:
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>()
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception
Thread.sleep(10000000);
emitter.onNext("hello");
)
代码中subscribe方法sleep住了,ObservableOnSubscribe对象不能被回收,导致外层的Activity也不能被垃圾回收器回收,因为我们的Activity中持有了一个Bitmap对象,当我们退出此Activity,从内存监测中也能看到内存占用并没有被释放.如图:
查看前一定要点一下左上角的卡车图标,强制GC一下…
你也许看到问题了,我们没有在onDestroy方法里调用 comDisposable.dispose();方法,好我们现在调用此方法,然后查看结果.OK完美内存占用消失了,如图:
控制台打印了 I/MainActivity: thread interrupted,任务取消的原因是调用的线程的interrupt()方法来实现的,等等这里会有一个问题,如果我们捕获了InterruptedException异常但是并不退出这个任务,或者说Interrupt()方法并不能打断这个任务呢,如下面代码:
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>()
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception
int num = 1000;
while(num > 0)
try
Thread.sleep(10000000);
catch (InterruptedException e)
e.printStackTrace();
Log.i(TAG, "thread interrupted");
emitter.onNext("hello");
)
这样呢…看你还怎么打断-_-
从截图中可以看到内存没有释放,也就是Activity不能被回收,这就是第一个产生内存泄露的地方内部类ObservableOnSubscribe无法立刻垃圾回收导致外层Activity不能被释放,
有经验的java程序员一定会跳出来说要把ObservableOnSubscribe声明为静态的,好吧,让我们试试
static ObservableOnSubscribe<String> observableOnSubscribe = new ObservableOnSubscribe<String>()
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception
int num = 1000;
while(num > 0)
try
Thread.sleep(10000000);
catch (InterruptedException e)
e.printStackTrace();
Log.i(TAG, "thread interrupted");
emitter.onNext("hello");
;
CompositeDisposable comDisposable = new Composite以上是关于一张图搞定-RxJava2的线程切换原理和内存泄露问题的主要内容,如果未能解决你的问题,请参考以下文章