Rxjava系列 RxJava2.0背压原理解析

Posted 太合音乐技术团队

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Rxjava系列 RxJava2.0背压原理解析相关的知识,希望对你有一定的参考价值。

RxJava2.0有一个很大的特色是背压的支持,如果要使用背压的话需要使用 Flowable。为什么需要背压这种机制呢, 先抛开Flowable不说,我们想一个实际应用中的真实案例:如果发送事件和接收事件处于不同的线程中,而且事件处理的速度慢,事件发送的速度快,那么肯定需要一个池子来存储发送的事件等待下游消化,否则消息就会丢失。如果发送事件速度快而接收事件速度慢,那么这个池子会越来越大最终爆掉内存。背压策略正是为了解决这一问题而引出来的。

我们先思考一下,在异步的情况下,如何能让上游发送事件的速度和下游处理事件的速度能够保持一致呢。如果下游能够及时告诉上游它的处理速度,让上游的发送事件速度慢下来,是不是可以呢。带着这个问题我们首先来看下RxJava2.0中的背压策略的实现。 首先我们看一下 Flowable的基本使用案例:

 
   
   
 
  1. Flowable.create(new FlowableOnSubscribe<Integer>() {

  2.            @Override

  3.            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {

  4.                Log.d(TAG, "emit 1");

  5.                emitter.onNext(1);

  6.                Log.d(TAG, "emit 2");

  7.                emitter.onNext(2);

  8.                Log.d(TAG, "emit complete");

  9.                emitter.onComplete();

  10.            }

  11.        }, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io())

  12.                .observeOn(androidSchedulers.mainThread())

  13.                .subscribe(new Subscriber<Integer>() {

  14.                    @Override

  15.                    public void onSubscribe(Subscription s) {

  16.                        Log.d(TAG, "onSubscribe");

  17.                        mSubscription = s;

  18.                        mSubscription.request(1);

  19.                    }

  20.                    @Override

  21.                    public void onNext(Integer integer) {

  22.                        Log.d(TAG, "onNext: " + integer);

  23.                        mSubscription.request(1);

  24.                    }

  25.                    @Override

  26.                    public void onError(Throwable t) {

  27.                        Log.w(TAG, "onError: ", t);

  28.                    }

  29.                    @Override

  30.                    public void onComplete() {

  31.                        Log.d(TAG, "onComplete");

  32.                    }

  33.                });

仔细看一下上述案例中Flowable 的具体使用方法,里面有两个地方和我们之前Rxjava的使用经验不同,一个是创建Flowable 需要传入一个 BackpressureStrategy的参数,另外是在 Subscriber 的onSubscribe方法中调用 subscription.request 方法。这两处是做什么用的呢。回到开头我们提出的问题,为了解决上游发送事件和下游处理事件速度不一致的问题,我们需要一个策略能让下游告诉上游,下游的处理能力是怎样的,这两处不同就是为了解决这个问题。 

首先我们解释下 BackpressureStrategy参数的含义,我们这先介绍下概念,详细理解后续会继续介绍:

BackpressureStrategy.ERROR:若上游发送事件速度超出下游处理事件能力,且事件缓存池已满,则抛出异常 

BackpressureStrategy.BUFFER:若上游发送事件速度超出下游处理能力,则把事件存储起来等待下游处理 

BackpressureStrategy.DROP:若上游发送事件速度超出下游处理能力,事件缓存池满了后将之后发送的事件丢弃 

BackpressureStrategy.LATEST:若上有发送时间速度超出下游处理能力,则只存储最新的128个事件  备注:128是Flowable中事件缓存池的大小

这里穿插一下,为什么事件缓存池大小为 128,这个我们可以去看下 Flowable中的部分源码:

 
   
   
 
  1. /** The default buffer size. */

  2. static final int BUFFER_SIZE;

  3. static {

  4. BUFFER_SIZE = Math.max(16, Integer.getInteger("rx2.buffer-size", 128));

  5. }

Flowable 中默认的buffer大小是128,用来存储上游的事件。

那么这个subscripiton.request方法主要用来做什么呢,这个方法是下游用来告知上游处理事件的能力的。Flowable采用了一种响应式拉取的思路用来解决上下游处理速度不统一的问题。简而言之,所谓响应式拉取就是,下游向上游告知自己的处理能力,下游处理完一个事件,然后告诉上游,上游则继续发送事件,继续等待下游的通知。 当调用subscription.request(1)时,就是下游向上游说:我现在能处理一个事件。

我们上面的例子只是先大概介绍下,Flowable在使用时的不同以及背压的一些基本概念。那么在实际的使用中,如何使用Flowable提供的这些机制来使上下游的处理速度统一起来呢,我们继续向下分析。 之前我们提到调用 subscription.request方法之后,下游就通知了上游自己的处理能力,那么上游需要获知下游的处理能力才能决定怎样发送事件。那么上游通过什么样的方式获知这一数据呢。首先来看一下上游用来发送事件的 FlowableEmitter 的部分源码:

 
   
   
 
  1. public interface FlowableEmitter<T> extends Emitter<T> {

  2.    void setDisposable(Disposable s);

  3.    void setCancellable(Cancellable c);

  4.    /**

  5.     * The current outstanding request amount.

  6.     * <p>This method is thread-safe.

  7.     * @return the current outstanding request amount

  8.     */

  9.    long requested();  // 获取到当前请求事件的数量

  10.    boolean isCancelled();

  11.    FlowableEmitter<T> serialize();

  12. }

FlowableEmitter中有一个方法requested ,注释显示这个方法的作用是获取当前的请求数量。那么这个方法是否为获取当前下游处理能力的接口,我们可以写个小demo来进行验证:

 
   
   
 
  1.        Flowable

  2.                .create(new FlowableOnSubscribe<Integer>() {

  3.                    @Override

  4.                    public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {

  5.                        Log.d(TAG, "First requested = " + emitter.requested());

  6.                        boolean flag;

  7.                        for (int i = 0; ; i++) {

  8.                            flag = false;

  9.                            while (emitter.requested() == 0) {

  10.                                if (!flag) {

  11.                                    Log.d(TAG, "can't emit value !");

  12.                                    flag = true;

  13.                                }

  14.                            }

  15.                            emitter.onNext(i);

  16.                            Log.d(TAG, "emit " + i + " , requested = " + emitter.requested());

  17.                        }

  18.                    }

  19.                }, BackpressureStrategy.ERROR)

  20.                .subscribeOn(Schedulers.io())

  21.                .observeOn(AndroidSchedulers.mainThread())

  22.                .subscribe(new Subscriber<Integer>() {

  23.                    @Override

  24.                    public void onSubscribe(Subscription s) {

  25.                        Log.d(TAG, "onSubscribe");

  26.                        mSubscription = s;

  27.                        mSubscription.request(3);

  28.                    }

  29.                    @Override

  30.                    public void onNext(Integer integer) {

  31.                        Log.d(TAG, "onNext: " + integer);

  32.                    }

  33.                    @Override

  34.                    public void onError(Throwable t) {

  35.                        Log.w(TAG, "onError: ", t);

  36.                    }

  37.                    @Override

  38.                    public void onComplete() {

  39.                        Log.d(TAG, "onComplete");

  40.                    }

  41.                });

运行之后我们可以看下结果:

 
   
   
 
  1. D/TAG: onSubscribe                        

  2. D/TAG: requested = 128        

  3. D/TAG: emit event 1, requested = 127        

  4. D/TAG: onNext: 1                          

  5. D/TAG: emit event 2, requested = 126

  6. D/TAG: onNext: 2

  7. D/TAG: emit event 3, requested = 125

  8. D/TAG: onNext: 3  

  9. D/TAG: emit event 4, requested = 124

  10. D/TAG: emit event 5, requested = 123

  11. ......

  12. D/TAG: emit event 128, requested = 0          

通过代码跑出来的结果发现,不管下游通过s.request()设置的请求值是多少,我们在上游获取到的初始的requested请求数量都是128,随着事件的发送这个值会递减。这个原因是什么呢:前面我们分析过,Flowable内部有一个128大小的事件缓存池,从上游产生的事件,会先放到事件缓存中,然后再交由下游处理,如果下游处理不过来就会一直在缓存池中。因此上游在产生事件的时候,需要先考虑缓存池中是否依然有空间,如果缓存池中已经没有空间而继续发送事件的话容易产生异常,这就是为什么requested的值不是下游请求的事件数量的原因,因为即使上游发送的事件数量超出了下游的数据需求量,也可以放在缓存中,如果没有超过下游请求量,则将缓存池中的数据传递给下游。

那emitter.requested()的值在发送完128个数据之后就递减到0了,按照demo中的写法就没法继续发送事件了,如果下游的请求量大于128 的话,岂不是多于128的事件都无法获取到了。事实是这样吗,我们继续通过demo来进行验证,我们将demo中 mSubscription.request(3)改为 mSubscription.request(96)。我们会看到结果如下所示:

 
   
   
 
  1. D/TAG: onSubscribe                        

  2. D/TAG: requested = 128        

  3. D/TAG: emit event 1, requested = 127        

  4. D/TAG: onNext: 1                          

  5. D/TAG: emit event 2, requested = 126

  6. D/TAG: onNext: 2

  7. ......

  8. D/TAG: emit event 95, requested = 33

  9. D/TAG: onNext: 95  

  10. D/TAG: emit event 96, requested = 95

  11. D/TAG: emit event 97, requested = 94

  12. ......

  13. D/TAG: emit event 223, requested = 0  

从结果可以看出,emitter.requested()数量并不是一直递减的,在递减到33时,又回升到了95。也就是说,异步缓存池中的事件并不是下游处理一条便清除掉一条,而是等到下游累计处理了95条之后,集中清理一次缓存池,这时候缓存池中又有了95个空位,可以继续向缓存池中存储事件了。这样的策略设计,我们在使用背压的时候只需要考虑异步缓存池中是否放得下,缓存池中有空间便可继续发送事件,没有空间则等一等,等到下游处理完95条之后再继续发送。Flowable中的背压机制便是这样来运行的。

参考文章:

https://github.com/ReactiveX/RxJava/wiki/Backpressure

http://www.jianshu.com/p/464fa025229e

http://www.jianshu.com/p/ff8167c1d191


以上是关于Rxjava系列 RxJava2.0背压原理解析的主要内容,如果未能解决你的问题,请参考以下文章

RxJava 线程切换原理

RxJava2.0学习笔记2 2018年3月29日 星期四

RxJava2.0的使用详解

浅析RxJava 1.x&2.x版本使用区别及原理:ObservableFlowable等基本元素源码解析

浅析RxJava 1.x&2.x版本使用区别及原理:ObservableFlowable等基本元素源码解析

关于RxJava2 0你不知道的事