AndroidRxjava2 Flowable详解与背压那些事

Posted 寒小枫

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了AndroidRxjava2 Flowable详解与背压那些事相关的知识,希望对你有一定的参考价值。

1.Rxjava1中的背压

Rxjava2中有这么一个被观察者Flowable,同样作为被观察者,它和Observable有什么区别呢,在Rxjava2中,Observable不再支持背压,而新增的Flowable支持背压,何为背压,就是上游发送事件的速度大于下游处理事件的速度所产生的现象。

我们来看个例子,先把rxjava切换到rxjava1.0:

implementation 'io.reactivex:rxjava:1.1.6'
implementation 'io.reactivex:rxandroid:1.2.1'

然后执行如下代码:

        //被观察者在主线程中,每1ms发送一个事件
        Observable.interval(1, TimeUnit.MILLISECONDS)
                //观察者每1s才处理一个事件
                .subscribe(new Action1<Long>() 
                    @Override
                    public void call(Long aLong) 
                        try 
                            Thread.sleep(1000);
                         catch (InterruptedException e) 
                            e.printStackTrace();
                        
                        Log.w("tag", "---->" + aLong);
                    
                );

执行结果如下:

我特?说好的背压呢,说好的异常呢,不要慌,因为上面的代码是同步的情况,都是运行在祝线程的,所以同步的情况下,被观察者每发送一个事件,观察者就会处理一个事件,等观察者处理完当前事件后,被观察者才会继续发送事件,两者分工明确,恩爱和睦,不存在发送速度不一致的情况。

下面我们来看下异步的情况:

        //被观察者在主线程中,每1ms发送一个事件
        Observable.interval(1, TimeUnit.MILLISECONDS)
                .observeOn(Schedulers.newThread())
                //观察者在子线程中每1s处理一个事件
                .subscribe(new Action1<Long>() 
                    @Override
                    public void call(Long aLong) 
                        try 
                            Thread.sleep(1000);
                         catch (InterruptedException e) 
                            e.printStackTrace();
                        
                        Log.w("tag", "---->" + aLong);
                    
                );

运行后就会出现如下异常:

出现了背压的情况,抛出了MissingBackpressureException异常,异步情况下被观察者发送事件是比较暴力的,一次性全部发完,放在缓存池,然后观察者一条条慢慢去处理,发送过快就会出现背压的情况.

背压产生的条件:必须是异步的场景下才会出现,即被观察者和观察者处于不同的线程中。

rxjava1中默认的缓存池大小是16,当事件超过就会出现MissingBackpressureException,看如下例子:

    Observable.create(new Observable.OnSubscribe<String>() 
            @Override
            public void call(Subscriber<? super String> subscriber) 
                for (int i = 0; i < 17; i++) 
                    Log.w("tag", "send ----> i = " + i);
                    subscriber.onNext("i = "+i);
                
            
        )
                .subscribeOn(Schedulers.newThread())
                //将观察者的工作放在新线程环境中
                .observeOn(Schedulers.newThread())
                //观察者处理每1000ms才处理一个事件
                .subscribe(new Action1<String>() 
                    @Override
                    public void call(String value) 
                        try 
                            Thread.sleep(1000);
                         catch (InterruptedException e) 
                            e.printStackTrace();
                        
                        Log.w("tag", "---->" + value);
                    
                );

你看:

嗯,默认的缓存池为什么是16,这个问题问的好,因为人家rxjava给的默认值就是16啊,不信你看:

    public final <B> Observable<List<T>> buffer(Observable<B> boundary) 
        return buffer(boundary, 16);
    

rxjava1中也提供了处理背压的操作符onBackpressureBuffer和onBackpressureDrop,下面我们来简单看下onBackpressureBuffer:

        //被观察者在主线程中,每1ms发送一个事件
        Observable.create(new Observable.OnSubscribe<String>() 
            @Override
            public void call(Subscriber<? super String> subscriber) 
                for (int i = 0; i < 10000; i++) 
                    Log.w("tag", "send ----> i = " + i);
                    subscriber.onNext("i = "+i);
                
            
        )
                .onBackpressureBuffer()
                .subscribeOn(Schedulers.newThread())
                .observeOn(Schedulers.newThread())
                .subscribe(new Action1<String>() 
                    @Override
                    public void call(String value) 
                        try 
                            Thread.sleep(1000);
                         catch (InterruptedException e) 
                            e.printStackTrace();
                        
                        Log.w("tag", "---->" + value);
                    
                );

运行结果如下:

其实onBackpressureBuffer也就是增加了缓存池的大小,这个值为Long.MAX_VALUE,当然我们也可以自己指定onBackpressureBuffer(size)的大小:

    Observable.create(new Observable.OnSubscribe<String>() 
            @Override
            public void call(Subscriber<? super String> subscriber) 
                for (int i = 0; i < 100; i++) 
                    Log.w("tag", "send ----> i = " + i);
                    subscriber.onNext("i = "+i);
                
            
        )
                .onBackpressureBuffer(100)
                .subscribeOn(Schedulers.newThread())
                .observeOn(Schedulers.newThread())
                .subscribe(new Action1<String>() 
                    @Override
                    public void call(String value) 
                        try 
                            Thread.sleep(1000);
                         catch (InterruptedException e) 
                            e.printStackTrace();
                        
                        Log.w("tag", "---->" + value);
                    
                );


onBackpressureDrop的作用是当观察者来不及处理事件的时候,会把事件给丢弃掉,而onBackpressureLatest操作符表示当被观察者Observable发出事件的速度比观察者消耗得要快,观察者会接收Observable最新发出的事件进行处理,这两种情况大家可以自行测试感受下。

从上面的例子可以看出,在rxjava1中,interval操作符默认是不支持背压的,我们来试试range操作符:

    Observable.range(1,10000)
                .subscribeOn(Schedulers.newThread())
                .observeOn(Schedulers.newThread())
                .subscribe(new Action1<Integer>() 
                    @Override
                    public void call(Integer value) 
                        try 
                            Thread.sleep(1000);
                         catch (InterruptedException e) 
                            e.printStackTrace();
                        
                        Log.w("tag", "---->" + value);
                    
                );

运行结果如下:

尼玛,竟然没有出现背压,纳尼?

表情包好像放错了,走错片场了,哈哈哈,难道range操作符有毛病,不应该啊,最后经过一番查找,发现问题在observeOn操作符上,observeOn这个操作符内部有一个缓冲区,Android环境下长度是16,它会告诉range最多发送16个事件,充满缓冲区即可。

这样可以看出,之前使用的interval操作符是不支持背压的,而range则支持背压,那么到底什么样的Observable支持背压或不支持背压呢?

其实在rxjava1中,不是所有Observable都支持背压,从上面的例子也可以看出来这一点,我们知道Observable有hot和cold之分,rxjava1中hot observable是不支持背压的,而cold observable中也有一部分不支持背压,这里不再深究,想继续了解可以自行google,另外一个原因是现在都tm Rxjava2了,我还在这扯rxjava1,罪过罪过,我也是为了引出问题。

简单扯一下解决背压的思路,无非是限制发送的速度,俗称限流,很多操作符都可以做到这些,比如sample在一段时间内只处理最后一个数据等,也可以使用rxjava1中提供的onBackpressureBuffer,onBackpressureDrop,onBackpressureLatest。

虽然rxjava1也有处理背压的方法,但设计并不完美,缓存池大小只有16,而且被观察者无法得知下游观察者对事件的处理速度,一次性把事件抛给了下游观察者,所以rxjava2中对背压进行了改进。

2.Rxjava2中的背压

Rxjava2中新增了一个被观察者Flowable用来专门支持背压,默认队列大小128,并且其所有的操作符都强制支持背压,先看个简单的例子:

        Flowable.create(new FlowableOnSubscribe<String>() 
            @Override
            public void subscribe(FlowableEmitter<String> emitter) throws Exception 
                for (int i = 0;i < 1000000; i++) 
                    emitter.onNext("i = "+i);
                
            
        , BackpressureStrategy.ERROR)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() 
                    @Override
                    public void accept(String s) throws Exception 
                        Log.e("tag","----> "+s);
                    
                );

运行结果如下:

说好的支持背压呢,怎么这个熟悉的异常又出现了????

细心的同学肯定发现了,Flowable.create方法第二个参数BackpressureStrategy.ERROR,这个BackpressureStrategy类其实就是处理背压的策略类,看下这个类的源码:

public enum BackpressureStrategy 
    //不指定背压策略
    MISSING,
    //出现背压就抛出异常
    ERROR,
    //指定无限大小的缓存池,此时不会出现异常,但无限制大量发送会发生OOM
    BUFFER,
    //如果缓存池满了就丢弃掉之后发出的事件
    DROP,
    //在DROP的基础上,强制将最后一条数据加入到缓存池中
    LATEST

依次来看下这几种策略的区别吧!

MISSING

        Flowable.create(new FlowableOnSubscribe<String>() 
            @Override
            public void subscribe(FlowableEmitter<String> emitter) throws Exception 
                for (int i = 0;i < 1000000; i++) 
                    emitter.onNext("i = "+i);
                
            
        , BackpressureStrategy.MISSING)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() 
                    @Override
                    public void accept(String s) throws Exception 
                        Log.e("tag","----> "+s);
                    
                );

不出所料,果然抛出了异常:

ERROR

BackpressureStrategy.ERROR上面已经测试过了,不再重复了,依然会报异常。

BUFFER

      Flowable.create(new FlowableOnSubscribe<String>() 
            @Override
            public void subscribe(FlowableEmitter<String> emitter) throws Exception 
                for (int i = 0;i < 1000000; i++) 
                    emitter.onNext("i = "+i);
                
            
        , BackpressureStrategy.BUFFER)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() 
                    @Override
                    public void accept(String s) throws Exception 
                        Log.e("tag","----> "+s);
                    
                );

运行结果如下,确实不会出现背压异常了,但是内存占用嗖嗖的升高,数据量足够大足够快的时候,OOM指日可待,哈哈哈!!!

DROP

        Flowable.create(new FlowableOnSubscribe<String>() 
            @Override
            public void subscribe(FlowableEmitter<String> emitter) throws Exception 
                for (int i = 0;i < 1000000; i++) 
                    emitter.onNext("i = "+i);
                
            
        , BackpressureStrategy.DROP)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() 
                    @Override
                    public void accept(String s) throws Exception 
                        Log.e("tag","----> "+s);
                    
                );

运行结果如下:

可以发现,在填充满了默认的128个大小的缓存池后,丢弃了很多数据,DROP就是干这事的,发不下就不放了,有点狠啊!!

LATEST

        Flowable.create(new FlowableOnSubscribe<String>() 
            @Override
            public void subscribe(FlowableEmitter<String> emitter) throws Exception 
                for (int i = 0;i < 1000; i++) 
                    emitter.onNext("i = "+i);
                
            
        , BackpressureStrategy.LATEST)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() 
                    @Override
                    public void accept(String s) throws Exception 
                        Log.e("tag","----> "+s);
                    
                );

这次我们只发送1000个事件,运行结果如下:

LATEST策略下,当缓存池满了之后也是会丢弃事件的,不仅如此,它还会把事件的最后一个强制放入到缓存池中,所以可以看到999被观察者收到了。

上面我们都是用的Flowable的create创建的被观察者,如果我们使用just,fromArray等操作符该如何指定背压策略呢?其实也很简单,因为rxjava2像rxjava1那样也提供了onBackpressureBuffer(),onBackpressureDrop(),onBackpressureLatest(),这样用就可以了:

        Flowable.range(1,1000)
                .onBackpressureBuffer(1000)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() 
                    @Override
                    public void accept(Integer s) throws Exception 
                        Log.e("tag","----> "+s);
                    
                );

嗯,运行结果很稳:

那么可能我们会有个疑问,上面的例子都是观察者被动的接收事件,能不能主动拉取事件呢,当然可以,我们看下下面这个例子:

        Flowable.create(new FlowableOnSubscribe<String>() 
            @Override
            public void subscribe(FlowableEmitter<String> emitter) throws Exception 
                for (int i = 0; i < 1000; i++) 
                    emitter.onNext("i = " + i);
                
            
        , BackpressureStrategy.BUFFER)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<String>() 
                    @Override
                    public void onSubscribe(Subscription s) 
                        subscription = s;
                    

                    @Override
                    public void onNext(String s) 
                        Log.e("tag", "----> " + s);
                    

                    @Override
                    public void onError(Throwable t) 

                    

                    @Override
                    public void onComplete() 

                    
                );

看下运行结果:

此时我们的观察者使用了Subscriber,它有一个onSubscribe方法,参数为Subscription,其实关键点就是这个Subscription,它的作用就是从缓存池拉取事件,它有一个request(count)方法,它的作用就是拉取事件,并可以指定拉取事件的个数。我们在上面的例子中,使用subscription.request(5)每次拉取5个事件,其实也是很简单的。

其实搞了半天,文章基本也要结束了,虽然rxjava提供了处理背压的策略,但是最好还是能尽量避免上游被观察者发送事件过快过多,实在需要处理,就结合各种策略和操作符进行按需处理。

3.项目中的使用

上周在项目中遇到了这么一个场景,就是在跳转页面之前需要释放camera,这是个耗时操作,返回当前页面的时候需要重新open Camera,而且open Camera的时机需要在SurfaceView的create中执行,这个场景刚好用request可以解决,例子和上面类似,就不再上代码了。

关于Flowable的使用就先到这吧,由于个人水平有限,难免会犯些错误,有问题欢迎留言讨论。

以上是关于AndroidRxjava2 Flowable详解与背压那些事的主要内容,如果未能解决你的问题,请参考以下文章

Flowable核心Service和部署流程详解

第五篇Flowable核心Service和部署流程详解

掌握Flowable核心流程操作的本质

第六篇Flowable核心流程操作的本质

Android RXJava2 内存性能

flowable编辑流程后id改变