一张图搞定-RxJava2的线程切换原理和内存泄露问题

Posted ccx-_-

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了一张图搞定-RxJava2的线程切换原理和内存泄露问题相关的知识,希望对你有一定的参考价值。

一张图搞定-RxJava2的线程切换原理和内存泄露问题分析

首先祭出我自己画的一张图


这张图显示的是RxJava2源码层面上的调用关系

下面通过一个例子来解释一下这张图

public class MainActivity extends Activity 
    private static final String TAG = "MainActivity";

    CompositeDisposable comDisposable = new CompositeDisposable();
    Bitmap bitmap;

    @Override
    protected void onCreate(Bundle savedInstanceState) 
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() 
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception 
                emitter.onNext("hello");
            
        )
          .subscribeOn(Schedulers.io())
          .observeOn(androidSchedulers.mainThread());
        Disposable disposable = observable.subscribe(new Consumer<String>() 
            @Override
            public void accept(@NonNull String s) throws Exception 
                Log.i(TAG,s);
            
        );
        comDisposable.add(disposable);
        bitmap = BitmapFactory.decodeResource(getResources(),R.mipmap.aaa);
    

    @Override
    protected void onDestroy() 
        super.onDestroy();
        //comDisposable.dispose();
    

这个例子很简单,在io线程里发送一个字符串,然后在主线程中打印出来.这里面我们加了一个bitmap对象,这个对象里面保持了一个大图片,我们通过观察内存占用,
就可以分析此Activity是否内存泄露,没有被垃圾回收.

首先从Observable.create方法开始分析

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) 
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));

生成了一个ObservableCreate对象,此对象中传入了一个ObservableOnSubscribe对象,而这个ObservableOnSubscribe对象是我们实现的一个内部类

new ObservableOnSubscribe<String>() 
    @Override
    public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception 
        emitter.onNext("hello");
    

这个ObservableOnSubscribe的构造方法定义为

public ObservableCreate(ObservableOnSubscribe<T> source) 
    this.source = source;

将ObservableOnSubscribe对象赋值给了source,在图中可以看到ObservableCreate里面的source的颜色和ObservableOnSubscribe的颜色相同都是黄色,这个
图中,我特意将相同的对象置为了相同的颜色,从ObservableCreate --> ObservableSubscribeOn --> ObservableObserveOn,从上到下,依次持有上层的对象,用的是装饰者模式,像极了Java的流对象.

ObservableSubscribeOn是执行subscribeOn方法生成的,代码如下

public final Observable<T> subscribeOn(Scheduler scheduler) 
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));

ObservableObserveOn是执行observeOn方法生成的,代码如下

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) 
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));

也就是下面这段代码最终得到了一个ObservableObserveOn对象

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() 
    @Override
    public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception 
        emitter.onNext("hello");
    
)
  .subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread());

OK,这就是图的左半边的对象生成逻辑,也许你会问每一个对象里的source是干什么用的?,别着急接下来你就会明白了.

来看subscribe操作,代码如下

Disposable disposable = observable.subscribe(new Consumer<String>() 
    @Override
    public void accept(@NonNull String s) throws Exception 
        Log.i(TAG,s);
    
);

这个subscribe方法中我们传了一个onNext(consumer对象)操作,进入subscribe源码里

public final Disposable subscribe(Consumer<? super T> onNext) 
    return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());

其实这个subscribe里是可以传四个参数的分别是onNext,onError,onComplete,onSubscribe,我们这里只传了一个onNext,其他操作源码传入了默认操作.

接着向里面跟源码:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
        Action onComplete, Consumer<? super Disposable> onSubscribe) 
    ObjectHelper.requireNonNull(onNext, "onNext is null");
    ObjectHelper.requireNonNull(onError, "onError is null");
    ObjectHelper.requireNonNull(onComplete, "onComplete is null");
    ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");

    LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);

    subscribe(ls);

    return ls;

可以看到new了一个LambdaObserver对象,并传入了 onNext, onError, onComplete, onSubscribe.之后调用subscribe方法,
此subscribe方法是ObservableObserveOn对象的,最终会调用ObservableObserveOn的subscribeActual方法,代码如下:

@Override
protected void subscribeActual(Observer<? super T> observer) 
    if (scheduler instanceof TrampolineScheduler) 
        source.subscribe(observer);
     else 
        Scheduler.Worker w = scheduler.createWorker();

        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    

让我们结合图来分析一下,这个方法调用的是source.subscribe并传入一个ObserveOnObserver对象






从图中我们可以看到ObservableObserveOn对象里的source是绿色的,是一个ObservableSubscribeOn对象,调用其subscribe方法并传入一个ObserveOnObserver对象(图中浅绿色),这个ObserveOnOnserver对象中有一个actual对象,此对象为LambdaObserver(图中紫色),从上面的分析中我们知道LambdaObserver对象中保存着我们的定义的onNext操作.

注意,接下来比较重要,要出现线程切换了:

因为调用了ObservableSubscribeOn的subscribe方法,最终会调用subscribeActual方法,代码如下

@Override
public void subscribeActual(final Observer<? super T> s) 
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

    s.onSubscribe(parent);

    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));

我们在知道ObservableSubscribeOn对象时调用subscribeOn(Schedulers.io())方法是生成的对象,上面的scheduler对象就是Schedulers.io()生成的一个线程调度对象,此对象是维护这一个线程池,让操作在io线程池中执行(关于io线程池,大家可以自己百度:) ),也就是此方法 scheduler.scheduleDirect(new SubscribeTask(parent))会切换线程来执行SubscribeTask任务,此任务的定义为:

final class SubscribeTask implements Runnable 
    private final SubscribeOnObserver<T> parent;

    SubscribeTask(SubscribeOnObserver<T> parent) 
        this.parent = parent;
    

    @Override
    public void run() 
        source.subscribe(parent);
    

其中parent就是一个SubscribeOnObserver对象,soure就是一个ObservableCreate对象,如图:






SubscribeOnObserver对象中也有一个actual对象(浅绿色),从图中可看到是从下面传上来的一个ObserveOnObserver对象.

执行source.subscribe(parent) (也就是ObservableCreate.subscribe(ObserveOnObserver))时,是在io线程中进行的,如图所示切换了线程.这里其实可以解释一个问题 subscribeOn如果执行多次为什么只有第一次有作用?,每一次subscribeOn都会切换线程,从图中我们可以看到,这个切换线程是倒着的从下往上,也就是一直切换到最后一个subscribeOn,其实就是我们代码中定义的第一个subscribeOn,也许你会问,从下往上是subscribeOn切换线程,那如果要调用我们自己定义的onNext,最终不是还要再从上往下调用回去,调用回去时还会切换线程么?别着急,一会就会讲到从上往下调用回去的逻辑,这里我可以先说一个概括 调用subscribe时的从下往上是subscribeOn切换线程,之后调用onNext传递数据时的从上往下是ObserveOn切换线程

接着往下,在io线程里调用ObservableCreate.subscribe(ObserveOnObserver),最终同样调用到subscribeActual方法,代码如下

@Override
protected void subscribeActual(Observer<? super T> observer) 
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    observer.onSubscribe(parent);

    try 
        source.subscribe(parent);
     catch (Throwable ex) 
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    

同样的调用了source.subscribe方法,这里的source是ObservableOnSubscribe对象,parent是一个CreateEmitter对象,如下图:






从图中可以看到CreateEmitter中有一个observer对象(深蓝色),此对象是传上来的SubscribeOnObserver对象,Ok终于调到头了,最终调用ObservableOnSubscribe的subscribe方法,这个ObservableOnSubscribe就是我们在Activity中自己实现的一个匿名内部类:

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() 
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception 
                emitter.onNext("hello");
            
        )

豁然开朗有木有,终于到了数据产生的源头了,并且这个方法是在io线程里执行的,准确的说是第一个subscribeOn方法所指定的线程,这里我们只分析onNext方法,其他onError,onComplete分析方法是相同的.
  接着来分析一下onNext方法的传递.首先从我们的调用开始emitter.onNext(“hello”);,这里的emitter其实就是上面我们创建的CreateEmitter对象,调用其onNext()方法,代码如下:

@Override
public void onNext(T t) 
    if (t == null) 
        onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
        return;
    
    if (!isDisposed()) 
        observer.onNext(t);
    

调用了observer的onNext方法,从我们的流程图中我们可以看到,observer其实是一个SubscribeOnObserver,也就是调用SubscribeOnObserver的onNext方法,代码如下:

@Override
       public void onNext(T t) 
           actual.onNext(t);
       

调用actual的onNext方法,从流程图中我们可以看到,这个acutal是一个ObserveObObserver对象,接着来看ObserveObObserver的onNext方法:

@Override
public void onNext(T t) 
    if (done) 
        return;
    

    if (sourceMode != QueueDisposable.ASYNC) 
        queue.offer(t);
    
    schedule();

关键的来了,调用了schedule()方法,从名字看是要切换线程了,

void schedule() 
           if (getAndIncrement() == 0) 
               worker.schedule(this);
           
       

       void drainNormal() 
           int missed = 1;

           final SimpleQueue<T> q = queue;
           final Observer<? super T> a = actual;

           for (;;) 
           boolean d = done;
                T v;
                try 
                    v = q.poll();
                 catch (Throwable ex) 
                  ....
                
                   a.onNext(v);
               
              ......
           
       

schedule()方法调用worker.schedule(this);此方法其实就是在将线程切换到主线程(也就是我们ObserveOn(AndroidSchedulers.mainThread())所指定的线程),执行drainNormal方法,
drainNormal()方法调用a.onNext(v),a 在

final Observer<? super T> a = actual;

中被赋值为actual,这个actual是不是很熟悉,没错在流程图中我们可以看到actual是一个LambdaObserver





结束了,还记得么LambdaObserver的onNext方法是我们定义的,终于完成了这么一条调用链,看着是挺复杂的,但结合着图理解,就很简单了:)

总结RxJava2的切换原理是 调用subscribe时的从下往上是subscribeOn切换线程,之后调用onNext传递数据时的从上往下是ObserveOn切换线程.



接下来我们来结合着流程图来看看可能存在内存泄露的地方,我们都知道Java中非静态内部类持有外部类的引用,这是导致内存泄露的常见形式:内部类因为某些原因不能被释放,导致它所持有的外部类也得不到释放.来看看我们demo中定义的RxJava代码:

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() 
    @Override
    public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception 
        emitter.onNext("hello");
    
)
  .subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread());
Disposable disposable = observable.subscribe(new Consumer<String>() 
    @Override
    public void accept(@NonNull String s) throws Exception 
        Log.i(TAG,s);
    
);

我们定义了两个内部类:ObservableOnSubscribe和Consumer,我们想象一种情况,如果io线程里执行的ObservableOnSubscribe的onNext方法阻塞住了一直没有返回,导致此对象不能被回收,如下面代码:

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() 
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception 
                Thread.sleep(10000000);
                emitter.onNext("hello");
            
        )

代码中subscribe方法sleep住了,ObservableOnSubscribe对象不能被回收,导致外层的Activity也不能被垃圾回收器回收,因为我们的Activity中持有了一个Bitmap对象,当我们退出此Activity,从内存监测中也能看到内存占用并没有被释放.如图:



查看前一定要点一下左上角的卡车图标,强制GC一下…

你也许看到问题了,我们没有在onDestroy方法里调用 comDisposable.dispose();方法,好我们现在调用此方法,然后查看结果.OK完美内存占用消失了,如图:





控制台打印了 I/MainActivity: thread interrupted,任务取消的原因是调用的线程的interrupt()方法来实现的,等等这里会有一个问题,如果我们捕获了InterruptedException异常但是并不退出这个任务,或者说Interrupt()方法并不能打断这个任务呢,如下面代码:

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() 
    @Override
    public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception 

        int num = 1000;
        while(num > 0) 
            try 
                Thread.sleep(10000000);
             catch (InterruptedException e) 
                e.printStackTrace();
                Log.i(TAG, "thread interrupted");
            
        
        emitter.onNext("hello");
    
)

这样呢…看你还怎么打断-_-






从截图中可以看到内存没有释放,也就是Activity不能被回收,这就是第一个产生内存泄露的地方内部类ObservableOnSubscribe无法立刻垃圾回收导致外层Activity不能被释放,
有经验的java程序员一定会跳出来说要把ObservableOnSubscribe声明为静态的,好吧,让我们试试

static ObservableOnSubscribe<String> observableOnSubscribe = new ObservableOnSubscribe<String>() 
        @Override
        public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception 

            int num = 1000;
            while(num > 0) 
                try 
                    Thread.sleep(10000000);
                 catch (InterruptedException e) 
                    e.printStackTrace();
                    Log.i(TAG, "thread interrupted");
                
            
            emitter.onNext("hello");
        
    ;

    CompositeDisposable comDisposable = new Composite

以上是关于一张图搞定-RxJava2的线程切换原理和内存泄露问题的主要内容,如果未能解决你的问题,请参考以下文章

RxJava2探索-线程切换原理之subscribeOn

RxJava2探索-线程切换原理之subscribeOn

5000字12 连环炮一张图快速搞定线程池

RxJava2线程切换原理分析

RxJava2探索-线程切换原理之observeOn

RxJava2探索-线程切换原理之observeOn