Rxjava 整理(未完)

Posted 且听真言

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Rxjava 整理(未完)相关的知识,希望对你有一定的参考价值。

一、定义

RxJava 是一个 基于事件流、实现异步操作的库。

二、gradle配置

 implementation 'io.reactivex.rxjava3:rxjava:3.0.0'

三、 Rxjava相关操作符介绍

1.创建操作符。

1.create()

创建被观察者对象。

private fun doRxJavaCommonTest() {

    Observable.create(ObservableOnSubscribe<String> {

        Log.v(TAG, "---> subscribe")
        it.onNext("")
        it.onNext("")
        it.onComplete()

    }).subscribe(object : Observer<String> {
        override fun onComplete() {
            Log.v(TAG, "---> onComplete")
        }

        override fun onSubscribe(d: Disposable?) {
            Log.v(TAG, "---> onSubscribe")
        }

        override fun onNext(t: String?) {
            Log.v(TAG, "---> onNext")
        }

        override fun onError(e: Throwable?) {
            Log.v(TAG, "---> onnError")
        }
    })
}

2.just()

主要作用就是创建一个被观察者,并发送事件,但是发送的事件不可以超过10个以上。

超过10个就会编译报错。

private fun testJust() {
    Observable.just("1", "2", "3").subscribe(object : Observer<String> {
        override fun onComplete() {
            Log.v(TAG, "onComplete")
        }

        override fun onSubscribe(d: Disposable?) {
            Log.v(TAG, "onSubscribe")
        }

        override fun onNext(t: String?) {
            Log.v(TAG, "接收到了事件$t")
        }

        override fun onError(e: Throwable?) {
            Log.v(TAG, "onError:$e")
        }
    })
}
2021-06-05 15:38:55.690 29674-29674/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onSubscribe
2021-06-05 15:38:55.690 29674-29674/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件1
2021-06-05 15:38:55.691 29674-29674/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件2
2021-06-05 15:38:55.691 29674-29674/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件3
2021-06-05 15:38:55.691 29674-29674/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onComplete

3. timer()

作用:当到指定时间后就会发送一个0的值给观察者。

在项目中,可以做一些延时的处理,类似于Handler中的延时。

延迟5秒后,将结果发送给观察者,Consumer和Observer是创建观察者的两种写法,相当于观察者中的onNext方法。

private fun testTimer() {
    Observable.timer(5, TimeUnit.SECONDS).subscribe(object : Consumer<Long> {
        override fun accept(t: Long?) {
            Log.v(TAG, t.toString())
        }
    })
}
2021-06-05 16:03:38.900 30043-30088/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 0

4 interval()

每隔一段时间就会发送一个事件,这个事件是从0开始,不断增1的数字。 类似于android中的Timer,做计时器用。

private fun testInterval() {
    Observable.interval(2, TimeUnit.SECONDS).subscribe(object : Consumer<Long> {
        override fun accept(t: Long?) {
            Log.v(TAG, t.toString())
        }
    })
}
2021-06-05 16:22:22.218 30163-30224/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 0
2021-06-05 16:22:24.218 30163-30224/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 1
2021-06-05 16:22:26.217 30163-30224/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 2
2021-06-05 16:22:28.218 30163-30224/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 3
2021-06-05 16:22:30.219 30163-30224/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 4
2021-06-05 16:22:32.218 30163-30224/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 5
2021-06-05 16:22:34.218 30163-30224/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 6
... ...

5 intervalRange()

可以指定发送事件的开始值(100)和数量(4),其他与 interval() 的功能一样。(参数依次是:开始值,循环执行的次数,初始延迟时间,执行间隔时间,时间单位)

private fun testIntervalRange() {
    Observable.intervalRange(100, 4, 0, 10, TimeUnit.SECONDS)
        .subscribe(object : Consumer<Long> {
            override fun accept(t: Long?) {
                Log.v(TAG, t.toString())
            }
        })
}
2021-06-05 16:40:22.510 30432-30500/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 100
2021-06-05 16:40:32.517 30432-30500/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 101
2021-06-05 16:40:42.511 30432-30500/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 102
2021-06-05 16:40:52.513 30432-30500/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 103

6 range()

发送一定范围的事件序列。

private fun testRange() {
    Observable.range(0, 5).subscribe(object : Consumer<Int> {
        override fun accept(t: Int?) {
            Log.v(TAG, t.toString())
        }
    })
}

2021-06-05 18:25:12.439 31282-31282/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 0
2021-06-05 18:25:12.439 31282-31282/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 1
2021-06-05 18:25:12.439 31282-31282/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 2
2021-06-05 18:25:12.439 31282-31282/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 3
2021-06-05 18:25:12.440 31282-31282/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 4

7 rangeLong()

作用与 range() 一样,数据类型为 Long

private fun testLongRange() {
    Observable.rangeLong(0,3).subscribe(object : Consumer<Long>{
        override fun accept(t: Long?) {
            Log.v(TAG, t.toString())
        }

    })
}

2021-06-05 18:54:54.821 31573-31573/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 0
2021-06-05 18:54:54.821 31573-31573/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 1
2021-06-05 18:54:54.821 31573-31573/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 2

8 empty() 、 never() 、 error()

empty():直接发送 onComplete() 事件,所以只会执行 onSubscribe 和 onComplete

private fun testEmpty() {
    Observable.empty<Int>().subscribe(object : Observer<Int> {
        override fun onComplete() {
            Log.v(TAG, "onComplete")
        }

        override fun onSubscribe(d: Disposable?) {
            Log.v(TAG, "onSubscribe")
        }

        override fun onNext(t: Int?) {
            Log.v(TAG, t.toString())
        }

        override fun onError(e: Throwable?) {
            Log.v(TAG, e.toString())
        }
    })
}
2021-06-05 19:12:06.043 31720-31720/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onSubscribe
2021-06-05 19:12:06.043 31720-31720/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onComplete

error():

private fun testError() {
    val subscribe = Observable.error<Int>(object : Throwable() {
        init {
            Log.v(TAG, "error init")
        }
    }).subscribe(object : Observer<Int> {
        override fun onComplete() {
            Log.v(TAG, "onComplete")
        }

        override fun onSubscribe(d: Disposable?) {
            Log.v(TAG, "onSubscribe")
        }

        override fun onNext(t: Int?) {
            Log.v(TAG, t.toString())
        }

        override fun onError(e: Throwable?) {
            Log.v(TAG, e.toString())
        }
    })
}
2021-06-06 10:35:23.908 500-500/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: error init
2021-06-06 10:35:23.954 500-500/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onSubscribe
2021-06-06 10:35:23.954 500-500/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: com.example.rxjavademo.MainActivity$testError$subscribe$1

never():

private fun testNever() {
    Observable.never<Int>().subscribe(object : Observer<Int> {
        override fun onComplete() {
            Log.v(TAG, "onComplete")
        }

        override fun onSubscribe(d: Disposable?) {
            Log.v(TAG, "onSubscribe")
        }

        override fun onNext(t: Int?) {
            Log.v(TAG, t.toString())
        }

        override fun onError(e: Throwable?) {
            Log.v(TAG, e.toString())
        }
    })
}
2021-06-06 10:44:38.259 873-873/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onSubscribe

2.转换操作符

1.map

可以将被观察者发送的数据类型转变成其他的类型,再传给观察者。

private fun testMap() {
    Observable.just(1, 2, 3).map(object : Function<Int, Int> {
        override fun apply(t: Int?): Int {
            return ((t ?: 0) + 1)
        }
    }).subscribe(object : Observer<Int> {
        override fun onComplete() {
            Log.v(TAG, "onComplete")
        }

        override fun onSubscribe(d: Disposable?) {
            Log.v(TAG, "onSubscribe")
        }

        override fun onNext(t: Int?) {
            Log.v(TAG, t.toString())
        }

        override fun onError(e: Throwable?) {
            Log.v(TAG, e.toString())
        }
    })
}

2021-06-06 11:01:48.716 1400-1400/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onSubscribe
2021-06-06 11:01:48.716 1400-1400/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 2
2021-06-06 11:01:48.716 1400-1400/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 3
2021-06-06 11:01:48.716 1400-1400/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 4
2021-06-06 11:01:48.716 1400-1400/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onComplete

2 flatMap()

将事件序列中的元素进行加工,返回一个新的被观察者。 flatMap() 其实与 map() 类似,但是 flatMap() 返回的是一个 Observerable,map()只是返回数据,如果在元素再加工的时候,想再使用上面的创建操作符的话,建议使用flatMap(),而非map()。

private fun flatMap() {
    Observable.just(1, 2, 3, 4).flatMap(object : Function<Int, ObservableSource<Int>> {
        override fun apply(t: Int?): ObservableSource<Int> {
            return if (t == 4 || t == null) {
                Observable.error(Exception("4是错误数字"))
            } else {
                Observable.just(t + 10)
            }
        }
    }).subscribe(object : Consumer<Int> {
        override fun accept(t: Int?) {
            Log.v(TAG, t.toString())
        }
    }, object :Consumer<Throwable> {
        override fun accept(t: Throwable?) {
            Log.v(TAG, t.toString())
        }

    })
}

2021-06-06 11:53:59.738 2310-2310/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 11
2021-06-06 11:53:59.738 2310-2310/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 12
2021-06-06 11:53:59.738 2310-2310/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 13
2021-06-06 11:53:59.744 2310-2310/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: java.lang.Exception: 4是错误数字

3 concatMap()

concatMap() 和 flatMap() 基本上是一样的,只不过 concatMap() 转发出来的事件是有序的,而 flatMap() 是无序的。

private fun testConcatMap() {
    Observable.just(1, 2, 3, 4)
        .concatMap(object : Function<Int, ObservableSource<Int>> {
            override fun apply(t: Int?): ObservableSource<Int> {
                if (t == 4 || t == null) {
                    return Observable.error(Exception("4 或者 null不是有效数字"))
                }
                return Observable.just(t + 10)
            }
        }).subscribe(object : Consumer<Int> {
            override fun accept(t: Int?) {
                Log.v(TAG, t.toString())
            }
        }, object : Consumer<Throwable> {
            override fun accept(t: Throwable?) {
                Log.v(TAG, t?.message?.toString() ?: "")
            }
        })
}
 14:23:59.811 3252-3252/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 11
2021-06-06 14:23:59.812 3252-3252/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 12
2021-06-06 14:23:59.812 3252-3252/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 13
2021-06-06 14:23:59.814 3252-3252/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 4 或者 null不是有效数字

4 buffer()

从需要发送的事件当中获取一定数量的事件,并将这些事件放到缓冲区当中一并发出。

buffer 有两个参数,一个是 count,另一个 skip。count 缓冲区元素的数量,skip 就代表缓冲区满了之后,发送下一次事件序列的时候要跳过多少元素。

private fun testBuffer() {
    Observable.just(1, 2, 3, 4, 5)
        .buffer(2, 1)
        .subscribe(object : Consumer<List<Int>> {
            override fun accept(t: List<Int>?) {
                Log.v(TAG, "缓冲区大小:" + t?.size)

                t?.forEach {
                    Log.v(TAG, it.toString())
                }
            }
        })
}

2021-06-06 14:45:43.605 3528-3528/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 缓冲区大小:2
2021-06-06 14:45:43.605 3528-3528/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 1
2021-06-06 14:45:43.605 3528-3528/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 2
2021-06-06 14:45:43.605 3528-3528/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 缓冲区大小:2
2021-06-06 14:45:43.605 3528-3528/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 2
2021-06-06 14:45:43.605 3528-3528/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 3
2021-06-06 14:45:43.605 3528-3528/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 缓冲区大小:2
2021-06-06 14:45:43.605 3528-3528/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 3
2021-06-06 14:45:43.605 3528-3528/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 4
2021-06-06 14:45:43.605 3528-3528/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 缓冲区大小:2
2021-06-06 14:45:43.605 3528-3528/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 4
2021-06-06 14:45:43.605 3528-3528/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 5
2021-06-06 14:45:43.605 3528-3528/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 缓冲区大小:1
2021-06-06 14:45:43.605 3528-3528/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 5

5 scan()

将发射的数据通过一个函数进行变换,然后将变换后的结果作为参数跟下一个发射的数据一起继续通过那个函数变换,这样依次连续发射得到最终结果。

private fun testScan() {
    Observable.just(1, 2, 3, 4, 5, 6)
        .scan(object : BiFunction<Int, Int, Int> {
            override fun apply(t1: Int?, t2: Int?): Int {
                Log.v(TAG, "t1:" + t1 + "t2:" + t2)
                return ((t1 ?: 0) + (t2 ?: 0))
            }
        }).subscribe(object : Consumer<Int> {
            override fun accept(t: Int?) {
                Log.v(TAG, t.toString())
            }
        })
}

2021-06-06 14:58:38.200 3698-3698/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 1
2021-06-06 14:58:38.200 3698-3698/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: t1:1t2:2
2021-06-06 14:58:38.200 3698-3698/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 3
2021-06-06 14:58:38.200 3698-3698/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: t1:3t2:3
2021-06-06 14:58:38.200 3698-3698/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 6
2021-06-06 14:58:38.200 3698-3698/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: t1:6t2:4
2021-06-06 14:58:38.200 3698-3698/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 10
2021-06-06 14:58:38.200 3698-3698/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: t1:10t2:5
2021-06-06 14:58:38.200 3698-3698/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 15
2021-06-06 14:58:38.200 3698-3698/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: t1:15t2:6
2021-06-06 14:58:38.200 3698-3698/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 21

四、RxJava源码分析

1.RxJava订阅流程

2.onNext与onComplete

3.理解RxJava是切换线程

private fun testChangeMainThread() {
    Observable.create(ObservableOnSubscribe<String> {
        it.onNext("1")
        it.onNext("2")
        it.onComplete()
        Log.v(TAG, "subscribe and currentThread is:" + Thread.currentThread())
    })
        //切换到子线程
        .subscribeOn(Schedulers.io())
        //切换到主线程
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(object : Observer<String> {
            override fun onSubscribe(d: Disposable?) {
                Log.v(TAG, "onSubscribe and currentThread is:" + Thread.currentThread())
            }

            override fun onNext(t: String?) {
                Log.v(
                    TAG,
                    "onNext:" + t.toString() + "and currentThread is:" + Thread.currentThread()
                )
            }

            override fun onError(e: Throwable?) {
                Log.v(
                    TAG,
                    "onError:" + e?.message + "and currentThread is:" + Thread.currentThread()
                )
            }

            override fun onComplete() {
                Log.v(TAG, "onComplete" + "and currentThread is:" + Thread.currentThread())
            }
        })
}

2021-06-08 20:44:40.133 6053-6053/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onSubscribe and currentThread is:Thread[main,5,main]
2021-06-08 20:44:40.138 6053-6112/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: subscribe and currentThread is:Thread[RxCachedThreadScheduler-1,5,main]
2021-06-08 20:44:40.138 6053-6053/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onNext:1and currentThread is:Thread[main,5,main]
2021-06-08 20:44:40.139 6053-6053/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onNext:2and currentThread is:Thread[main,5,main]
2021-06-08 20:44:40.139 6053-6053/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onCompleteand currentThread is:Thread[main,5,main]

通过上面的log 可以发现:

1.Observer(观察者)onSubscribe() 方法运行在当前线程中。

2.Observable(被观察者)中的 subscribe() 方法运行在subsribeOn()指定的线程中。

3.Observer(观察者)的onNext()和onComplete()和onError()运行在observerOn()指定的线程中。

RxJava线程切换只要涉及两个方法:subscribeOn()和observeOn()。

1.subscribeOn()源码分析

subscribeOn(Schedulers.io())

subscribeOn()传入一个Scheduler类实例,Scheduler是一个调度类,能够延时或周期性地去执行一个任务。

Schedulers类的io()方法执行流程

  • Scheduler IO (创建流程)

AndroidSchedulers.mainThread()

  • main thread创建流程:

subscribeOn,observeOn。subscribeOn是用来调整被观察者(发射源)的线程,而observeOn是调整观察者(处理器)的线程。

五、RxJava 背压策略

背压:解决了 因被观察者发送事件速度 与 观察者接收事件速度 不匹配(一般是前者 快于 后者),从而导致观察者无法及时响应 / 处理所有 被观察者发送事件 的问题

1.观察者不接收事件的情况下,被观察者继续发送事件,存放到缓存区;按需取出。

R.id.test_Flowable4 -> {
    mSubscription?.request(2)
}

/**
 * 观察者不接收事件扽情况下,被观察者继续发送事件,存放到缓存区,按需取出
 */
private var textFlowableTv: TextView? = null
private var mSubscription: Subscription? = null
private fun testFlowable5() {
    Flowable.create(FlowableOnSubscribe<Int> {
        Log.v(TAG, "发送事件 1")
        it.onNext(1)
        Log.v(TAG, "发送事件 2")
        it.onNext(2)
        Log.v(TAG, "发送事件 3")
        it.onNext(3)
        Log.v(TAG, "发送事件 4")
        it.onNext(4)
        Log.v(TAG, "发送完成")
        it.onComplete()
    }, BackpressureStrategy.ERROR)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(object : Subscriber<Int> {
            override fun onSubscribe(s: Subscription?) {
                mSubscription = s
            }

            override fun onNext(t: Int?) {
                Log.v(TAG, "onNext:$t")
            }

            override fun onError(t: Throwable?) {
                Log.v(TAG, "onError:$t")
            }

            override fun onComplete() {
                Log.v(TAG, "onComplete")
            }

        })
}
021-06-27 16:47:29.121 24137-24174/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件 1
2021-06-27 16:47:29.123 24137-24174/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件 2
2021-06-27 16:47:29.124 24137-24174/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件 3
2021-06-27 16:47:29.125 24137-24174/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件 4
2021-06-27 16:47:29.126 24137-24174/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送完成
2021-06-27 16:47:32.594 24137-24137/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onNext:1
2021-06-27 16:47:32.595 24137-24137/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onNext:2

     

2.观察者不接收事件的情况下,被观察者继续发送事件至超出缓存区大小(128)

private fun testFlowableError() {
    Flowable.create(FlowableOnSubscribe<Int> {
        //发送129个事件,即超出了缓存区的大小
        for (i in 0..128) {
            Log.v(TAG, "发送事件:$i")
            it.onNext(i)
        }
        it.onComplete()
    }, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(object : Subscriber<Int> {
            override fun onSubscribe(s: Subscription?) {
                Log.v(TAG, "onSubscribe")
            }

            override fun onNext(t: Int?) {
                Log.v(TAG, "onNext:$t")
            }

            override fun onError(t: Throwable?) {
                Log.v(TAG, "onError:$t")
            }

            override fun onComplete() {
                Log.v(TAG, "onComplete")
            }

        })
}

2021-06-27 17:33:19.096 25381-25412/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件:117
2021-06-27 17:33:19.096 25381-25412/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件:118
2021-06-27 17:33:19.096 25381-25412/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件:119
2021-06-27 17:33:19.096 25381-25412/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件:120
2021-06-27 17:33:19.096 25381-25412/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件:121
2021-06-27 17:33:19.096 25381-25412/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件:122
2021-06-27 17:33:19.096 25381-25412/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件:123
2021-06-27 17:33:19.097 25381-25412/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件:124
2021-06-27 17:33:19.097 25381-25412/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件:125
2021-06-27 17:33:19.098 25381-25412/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件:126
2021-06-27 17:33:19.098 25381-25412/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件:127
2021-06-27 17:33:19.098 25381-25412/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件:128
2021-06-27 17:33:19.101 25381-25381/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onError:io.reactivex.rxjava3.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests

3.同步订阅 与 异步订阅 的区别

  • 同步订阅中,被观察者与 观察者工作于同1线程
  • 同步订阅关系中没有缓存区

被观察者在发送1个事件后,必须等待观察者接收后,才能继续发下1个事件

private fun testFlowable6() {
    Flowable.create(FlowableOnSubscribe<Int> {
        Log.v(TAG, "发送事件 1")
        it.onNext(1)
        Log.v(TAG, "发送事件 2")
        it.onNext(2)
        Log.v(TAG, "发送事件 3")
        it.onNext(3)
        Log.v(TAG, "发送事件 4")
        it.onNext(4)
        Log.v(TAG, "发送完成")
        it.onComplete()
    }, BackpressureStrategy.ERROR).subscribe(object : Subscriber<Int> {
        override fun onSubscribe(s: Subscription?) {
            s?.request(4)
        }

        override fun onNext(t: Int?) {
            Log.v(TAG, "onNext:$t")
        }

        override fun onError(t: Throwable?) {
            Log.v(TAG, "onError:$t")
        }

        override fun onComplete() {
            Log.v(TAG, "onComplete")
        }

    })
}

2021-06-27 17:57:45.755 25771-25771/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件 1
2021-06-27 17:57:45.757 25771-25771/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onNext:1
2021-06-27 17:57:45.757 25771-25771/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件 2
2021-06-27 17:57:45.757 25771-25771/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onNext:2
2021-06-27 17:57:45.757 25771-25771/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件 3
2021-06-27 17:57:45.757 25771-25771/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onNext:3
2021-06-27 17:57:45.757 25771-25771/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件 4
2021-06-27 17:57:45.757 25771-25771/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onNext:4
2021-06-27 17:57:45.757 25771-25771/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送完成
2021-06-27 17:57:45.759 25771-25771/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onComplete

 观察者只能接受3个事件,但被观察者却发送了4个事件,所以出现了不匹配情况

private fun testFlowable6() {
    Flowable.create(FlowableOnSubscribe<Int> {
        Log.v(TAG, "发送事件 1")
        it.onNext(1)
        Log.v(TAG, "发送事件 2")
        it.onNext(2)
        Log.v(TAG, "发送事件 3")
        it.onNext(3)
        Log.v(TAG, "发送事件 4")
        it.onNext(4)
        Log.v(TAG, "发送完成")
        it.onComplete()
    }, BackpressureStrategy.ERROR).subscribe(object : Subscriber<Int> {
        override fun onSubscribe(s: Subscription?) {

            s?.request(3)
        }

        override fun onNext(t: Int?) {
            Log.v(TAG, "onNext:$t")
        }

        override fun onError(t: Throwable?) {
            Log.v(TAG, "onError:$t")
        }

        override fun onComplete() {
            Log.v(TAG, "onComplete")
        }

    })
}

2021-06-27 18:37:05.470 26474-26474/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件 1
2021-06-27 18:37:05.472 26474-26474/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onNext:1
2021-06-27 18:37:05.472 26474-26474/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件 2
2021-06-27 18:37:05.472 26474-26474/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onNext:2
2021-06-27 18:37:05.472 26474-26474/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件 3
2021-06-27 18:37:05.472 26474-26474/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onNext:3
2021-06-27 18:37:05.472 26474-26474/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件 4
2021-06-27 18:37:05.474 26474-26474/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onError:io.reactivex.rxjava3.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests
2021-06-27 18:37:05.474 26474-26474/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送完成

观察者不接收事件的情况:

private fun testFlowable7() {
    Flowable.create(FlowableOnSubscribe<Int> {
        Log.v(TAG, "发送事件 1")
        it.onNext(1)
        Log.v(TAG, "发送事件 2")
        it.onNext(2)
        Log.v(TAG, "发送事件 3")
        it.onNext(3)
        Log.v(TAG, "发送事件 4")
        it.onNext(4)
        Log.v(TAG, "发送完成")
        it.onComplete()
    }, BackpressureStrategy.ERROR).subscribe(object :Subscriber<Int> {
        override fun onSubscribe(s: Subscription?) {
            Log.v(TAG, "onSubscribe")
        }

        override fun onNext(t: Int?) {
            Log.v(TAG, "onNext:$t")
        }

        override fun onError(t: Throwable?) {
            Log.v(TAG, "onError:$t")
        }

        override fun onComplete() {
            Log.v(TAG, "onComplete")
        }

    })
}

2021-06-27 19:04:50.591 26746-26746/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onSubscribe
2021-06-27 19:04:50.593 26746-26746/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件 1
2021-06-27 19:04:50.596 26746-26746/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onError:io.reactivex.rxjava3.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests
2021-06-27 19:04:50.596 26746-26746/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件 2
2021-06-27 19:04:50.596 26746-26746/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件 3
2021-06-27 19:04:50.596 26746-26746/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件 4
2021-06-27 19:04:50.596 26746-26746/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送完成

4.控制 被观察者发送事件 的速度

1.被观察者根据观察者自身接收事件能力(10个事件),从而仅发送10个事件

private fun testFlowable8() {
    Flowable.create(FlowableOnSubscribe<Int> {
        //调用it.requested()获取当前观察者需要接收扽事件数量
        val n = it.requested()
        Log.v(TAG, "观察者可接收事件:" + n)

        //根据it.requestd()的值,即当前观察者需要接收扽事件数量来发送事件
        for (i in 1..n) {
            Log.v(TAG, "发送了事件" + i)
            it.onNext(i.toInt())
        }

    }, BackpressureStrategy.ERROR)
        .subscribe(object :Subscriber<Int> {
            override fun onSubscribe(s: Subscription?) {
                Log.v(TAG, "onSubscribe")

                //设置观察者每次能接收5个事件
                s?.request(5)
            }

            override fun onNext(t: Int?) {
                Log.v(TAG, "接收到了事件$t")
            }

            override fun onError(t: Throwable?) {
                Log.v(TAG, "onError():$t")
            }

            override fun onComplete() {
                Log.v(TAG, "onComplete");
            }
        })
}

2021-07-01 18:19:06.266 23497-23497/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onSubscribe
2021-07-01 18:19:06.266 23497-23497/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 观察者可接收事件:5
2021-07-01 18:19:06.266 23497-23497/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送了事件1
2021-07-01 18:19:06.267 23497-23497/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件1
2021-07-01 18:19:06.267 23497-23497/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送了事件2
2021-07-01 18:19:06.267 23497-23497/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件2
2021-07-01 18:19:06.267 23497-23497/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送了事件3
2021-07-01 18:19:06.267 23497-23497/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件3
2021-07-01 18:19:06.267 23497-23497/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送了事件4
2021-07-01 18:19:06.267 23497-23497/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件4
2021-07-01 18:19:06.267 23497-23497/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送了事件5
2021-07-01 18:19:06.267 23497-23497/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件5

在同步订阅情况中使用FlowableEmitter.requested()时,要注意的:

1.观察者可连续要求接收事件,被观察者会进行叠加并一起发送。

private fun testFlowable9() {
    Flowable.create(FlowableOnSubscribe<Int> {
        //调用it.requested()获取当前观察者需要接收的事件数量
        Log.v(TAG, "观察者可接收事件" + it.requested())
    }, BackpressureStrategy.ERROR).subscribe(object : Subscriber<Int> {
        override fun onSubscribe(s: Subscription?) {
            Log.v(TAG, "onSubscribe")
            s?.request(5) 
            s?.request(10) 
        }

        override fun onNext(t: Int?) {
            Log.v(TAG, "接收到了事件$t")
        }

        override fun onError(t: Throwable?) {
            Log.v(TAG, "onError: ", t)
        }

        override fun onComplete() {
            Log.v(TAG, "onComplete")
        }
    })
}

2021-07-01 20:14:27.196 7449-7449/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onSubscribe
2021-07-01 20:14:27.198 7449-7449/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 观察者可接收事件15

2. 每次发送事件后,emitter.requested()会实时更新观察者能接受的事件。

private fun testFlowable10() {
    Flowable.create(FlowableOnSubscribe<Int> {
        //1.调用t.requested()获取当前观察者需要接收的事件的数量
        Log.v(TAG, "观察者可接收事件数量:" + it.requested())

        //2. 每次发送事件后,t.requested()会实时更新观察者能接受的事件
        // 即一开始观察者要接收10个事件,发送了1个后,会实时更新为9个
        Log.v(TAG, "发送了事件1")
        it.onNext(1)
        Log.v(TAG, "发送了事件1后, 还需要发送事件数量:" + it.requested())

        Log.v(TAG, "发送了事件2")
        it.onNext(2)
        Log.d(TAG, "发送事件2后, 还需要发送事件数量:" + it.requested())

        Log.d(TAG, "发送了事件3");
        it.onNext(3);
        Log.d(TAG, "发送事件3后, 还需要发送事件数量:" + it.requested())
        it.onComplete()
    }, BackpressureStrategy.ERROR)
        .subscribe(object : Subscriber<Int> {
            override fun onSubscribe(s: Subscription?) {
                Log.d(TAG, "onSubscribe")
                s?.request(5)
            }

            override fun onNext(t: Int?) {
                Log.v(TAG, "接收到了事件$t")
            }

            override fun onError(t: Throwable?) {
                Log.v(TAG, "onError: $t")
            }

            override fun onComplete() {
                Log.v(TAG, "onComplete")
            }

        })
}

2021-07-01 20:37:58.644 8065-8065/com.example.rxjavademo D/MAINACTIVITY_RXJAVA: onSubscribe
2021-07-01 20:37:58.646 8065-8065/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 观察者可接收事件数量:5
2021-07-01 20:37:58.647 8065-8065/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送了事件1
2021-07-01 20:37:58.647 8065-8065/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件1
2021-07-01 20:37:58.648 8065-8065/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送了事件1后, 还需要发送事件数量:4
2021-07-01 20:37:58.648 8065-8065/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送了事件2
2021-07-01 20:37:58.648 8065-8065/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件2
2021-07-01 20:37:58.648 8065-8065/com.example.rxjavademo D/MAINACTIVITY_RXJAVA: 发送事件2后, 还需要发送事件数量:3
2021-07-01 20:37:58.648 8065-8065/com.example.rxjavademo D/MAINACTIVITY_RXJAVA: 发送了事件3
2021-07-01 20:37:58.649 8065-8065/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件3
2021-07-01 20:37:58.649 8065-8065/com.example.rxjavademo D/MAINACTIVITY_RXJAVA: 发送事件3后, 还需要发送事件数量:2
2021-07-01 20:37:58.649 8065-8065/com.example.rxjavademo D/MAINACTIVITY_RXJAVA: onComplete

 3.异常

当FlowableEmitter.requested()减到0时,则代表观察者已经不可接收事件。

此时被观察者若继续发送事件,则会抛出MissingBackpressureException异常。

若观察者没有设置可接收事件数量,即无调用Subscription.request()

那么被观察者默认观察者可接收事件数量 = 0,即FlowableEmitter.requested()的返回值 = 0

private fun testFlowable11() {
    Flowable.create(FlowableOnSubscribe<Int> {
        // 1. 调用emitter.requested()获取当前观察者需要接收的事件数量
        Log.v(TAG, "观察者可接收事件数量=" + it.requested())
        it.onNext(1)
        it.onNext(2)
        it.onNext(3)
        it.onComplete()
    }, BackpressureStrategy.ERROR)
        .subscribe(object : Subscriber<Int> {
            override fun onSubscribe(s: Subscription?) {
                Log.v(TAG, "onSubscribe")
                s?.request(2)
            }

            override fun onNext(t: Int?) {
                Log.v(TAG, "接收到了事件$t")
            }

            override fun onError(t: Throwable?) {
                Log.v(TAG, "onError:$t");
            }

            override fun onComplete() {
                Log.v(TAG, "onComplete");
            }

        })
}

2021-07-01 21:15:50.807 8877-8877/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onSubscribe
2021-07-01 21:15:50.809 8877-8877/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 观察者可接收事件数量=2
2021-07-01 21:15:50.810 8877-8877/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件1
2021-07-01 21:15:50.811 8877-8877/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件2
2021-07-01 21:15:50.812 8877-8877/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onError:io.reactivex.rxjava3.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests

 4.异步订阅相关

FlowableEmitter.requested()知道观察者自身接收事件能力,即 被观察者不能根据 观察者自身接收事件的能力 控制发送事件的速度。

private fun testFlowable12() {
    Flowable.create(FlowableOnSubscribe<Int> {
        // 调用emitter.requested()获取当前观察者需要接收的事件数量
        Log.d(TAG, "观察者可接收事件数量 = " + it.requested())
    }, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(object :Subscriber<Int> {
            override fun onSubscribe(s: Subscription?) {
                Log.v(TAG, "onSubscribe")
                s?.request(100)
                // 该设置仅影响观察者线程中的requested,却不会影响的被观察者中的FlowableEmitter.requested()的返回值
                // 因为FlowableEmitter.requested()的返回值 取决于RxJava内部调用request(n),而该内部调用会在一开始就调用request(128)
            }

            override fun onNext(t: Int?) {
                Log.v(TAG, "接收到了事件$t")
            }

            override fun onError(t: Throwable?) {
                Log.v(TAG, "onError:$t");
            }

            override fun onComplete() {
                Log.d(TAG, "onComplete")
            }

        })
}

2021-07-01 21:54:17.645 9315-9315/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onSubscribe
2021-07-01 21:54:17.651 9315-9355/com.example.rxjavademo D/MAINACTIVITY_RXJAVA: 观察者可接收事件数量 = 128

 通过RxJava内部固定调用被观察者线程中的request(n) 从而 反向控制被观察者的发送事件速度

R.id.testFlowableBtn13 -> {
    // 点击按钮才会接收事件 = 5 次
    // 点击按钮 则 接收5个事件
   mSubscription2?.request(5)
}

private var mSubscription2: Subscription? = null

// 被观察者:一共需要发送500个事件,但真正开始发送事件的前提 = FlowableEmitter.requested()返回值 ≠ 0
// 观察者:每次接收事件数量 = 48(点击按钮)
private fun testFlowable13() {
    Flowable.create(FlowableOnSubscribe<Int> {
        Log.v(TAG, "观察者可接收事件数量:" + it.requested())
        var flag: Boolean
        //被观察者一共需要发送500个事件
        for (i in 1..500) {
            flag = false
            //requested()==0则不发送
            while (it.requested() == 0L) {
                if (!flag) {
                    Log.v(TAG, "不再发送")
                    flag = true
                }
            }
            //requested() != 0 才发送
            Log.v(TAG, "发送了事件" + i + ",观察者可接收事件数量:" + it.requested())
            it.onNext(i)
        }

    }, BackpressureStrategy.ERROR)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(object : Subscriber<Int> {
            override fun onSubscribe(s: Subscription?) {
                Log.v(TAG, "onSubscribe")
                mSubscription2 = s
                //初始状态 = 不接收事件;通过点击按钮接收事件
            }

            override fun onNext(t: Int?) {
                Log.v(TAG, "接收到了事件$t")
            }

            override fun onError(t: Throwable?) {
                Log.v(TAG, "onError: $t")
            }

            override fun onComplete() {
                Log.v(TAG, "onComplete")
            }

        })

}
2021-07-04 11:48:53.669 13219-13219/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onSubscribe
2021-07-04 11:48:53.673 13219-13250/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 观察者可接收事件数量:128
2021-07-04 11:48:53.673 13219-13250/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送了事件1,观察者可接收事件数量:128
2021-07-04 11:48:53.675 13219-13250/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送了事件2,观察者可接收事件数量:127
2021-07-04 11:48:53.675 13219-13250/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送了事件3,观察者可接收事件数量:126
2021-07-04 11:48:53.675 13219-13250/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送了事件4,观察者可接收事件数量:125
2021-07-04 11:48:53.675 13219-13250/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送了事件5,观察者可接收事件数量:124
... ...
2021-07-04 11:48:53.693 13219-13250/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送了事件128,观察者可接收事件数量:1
2021-07-04 11:48:53.693 13219-13250/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 不再发送
2021-07-04 11:49:30.455 13219-13219/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件1
2021-07-04 11:49:30.455 13219-13219/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件2
2021-07-04 11:49:30.460 13219-13219/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件3
2021-07-04 11:49:30.460 13219-13219/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件4
2021-07-04 11:49:30.461 13219-13219/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件5

5.采用背压策略模式:BackpressureStrategy

当缓存区大小存满、被观察者仍然继续发送下1个事件时,该如何处理的策略方式。

1.直接抛出异常MissingBackpressureException

private fun testFlowable14() {
    Flowable.create(FlowableOnSubscribe<Int> {
        for (i in 1..129) {
            Log.v(TAG, "发送事件$i");
            it.onNext(i)
        }
        it.onComplete()
    }, BackpressureStrategy.ERROR)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(object : Subscriber<Int> {
            override fun onSubscribe(s: Subscription?) {
                Log.v(TAG, "onSubscribe");
            }

            override fun onNext(t: Int?) {
                Log.v(TAG, "接收到了事件$t")
            }

            override fun onError(t: Throwable?) {
                Log.w(TAG, "onError: $t")
            }

            override fun onComplete() {
                Log.v(TAG, "onComplete");
            }

        })

}

... ...
2021-07-04 14:21:51.700 14830-14956/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件127
2021-07-04 14:21:51.700 14830-14956/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件128
2021-07-04 14:21:51.700 14830-14956/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件129
2021-07-04 14:21:51.702 14830-14830/com.example.rxjavademo W/MAINACTIVITY_RXJAVA: onError: io.reactivex.rxjava3.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests

2.BackpressureStrategy.MISSING

友好提示:缓存区满了

private fun testFlowable15() {
    Flowable.create(FlowableOnSubscribe<Int> {
        for (i in 1..129) {
            Log.v(TAG, "发送事件$i")
            it.onNext(i)
        }
        it.onComplete()
    }, BackpressureStrategy.MISSING)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(object : Subscriber<Int> {
            override fun onSubscribe(s: Subscription?) {
                Log.v(TAG, "onSubscribe");
            }

            override fun onNext(t: Int?) {
                Log.v(TAG, "接收到了事件$t")
            }

            override fun onError(t: Throwable?) {
                Log.w(TAG, "onError: $t")
            }

            override fun onComplete() {
                Log.v(TAG, "onComplete");
            }
        })
}

2021-07-04 14:41:13.044 15186-15235/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件128
2021-07-04 14:41:13.044 15186-15235/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件129
2021-07-04 14:41:13.048 15186-15186/com.example.rxjavademo W/MAINACTIVITY_RXJAVA: onError: io.reactivex.rxjava3.exceptions.MissingBackpressureException: Queue is full?!

3.BackpressureStrategy.BUFFER

将缓存区大小设置成无限大

private fun testFlowable16() {
    Flowable.create(FlowableOnSubscribe<Int> {
        for (i in 1..150) {
            Log.v(TAG, "发送事件$i")
            it.onNext(i)
        }
        it.onComplete()
    }, BackpressureStrategy.BUFFER)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(object :Subscriber<Int> {
            override fun onSubscribe(s: Subscription?) {
                Log.v(TAG, "onSubscribe");
            }

            override fun onNext(t: Int?) {
                Log.v(TAG, "接收到了事件$t")
            }

            override fun onError(t: Throwable?) {
                Log.w(TAG, "onError: $t")
            }

            override fun onComplete() {
                Log.v(TAG, "onComplete");
            }
        })
}

... ...
2021-07-04 15:16:38.947 15488-15576/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件147
2021-07-04 15:16:38.947 15488-15576/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件148
2021-07-04 15:16:38.947 15488-15576/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件149
2021-07-04 15:16:38.947 15488-15576/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件150

4.BackpressureStrategy.DROP

超过缓存区大小(128)的事件丢弃

R.id.testFlowableBtn17 -> {
    mSubscription17?.request(128)
}
private var mSubscription17: Subscription? = null
private fun testFlowable17() {
    Flowable.create(FlowableOnSubscribe<Int> {
        for (i in 1..200) {
            Log.v(TAG, "发送事件$i")
            it.onNext(i)
        }
        it.onComplete()
    }, BackpressureStrategy.DROP)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(object : Subscriber<Int> {
            override fun onSubscribe(s: Subscription?) {
                Log.v(TAG, "onSubscribe")
                mSubscription17 = s
            }

            override fun onNext(t: Int?) {
                Log.v(TAG, "接收到了事件$t")
            }

            override fun onError(t: Throwable?) {
                Log.w(TAG, "onError: $t")
            }

            override fun onComplete() {
                Log.v(TAG, "onComplete");
            }
        })
}

... ...
2021-07-04 15:41:22.195 15851-15890/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件196
2021-07-04 15:41:22.195 15851-15890/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件197
2021-07-04 15:41:22.195 15851-15890/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件198
2021-07-04 15:41:22.195 15851-15890/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件199
2021-07-04 15:41:22.195 15851-15890/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件200
... ...
2021-07-04 15:41:33.829 15851-15851/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件125
2021-07-04 15:41:33.829 15851-15851/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件126
2021-07-04 15:41:33.829 15851-15851/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件127
2021-07-04 15:41:33.829 15851-15851/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件128
2021-07-04 15:41:33.833 15851-15851/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onComplete

5.BackpressureStrategy.LATEST

只保存最新(最后)事件,超过缓存区大小(128)的事件丢弃

private var mSubscription18: Subscription? = null
private fun testFlowable18() {
    Flowable.create(FlowableOnSubscribe<Int> {
        for (i in 1..200) {
            Log.v(TAG, "发送事件$i")
            it.onNext(i)
        }
        it.onComplete()
    }, BackpressureStrategy.LATEST)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(object : Subscriber<Int> {
            override fun onSubscribe(s: Subscription?) {
                Log.v(TAG, "onSubscribe")
                mSubscription18 = s
            }

            override fun onNext(t: Int?) {
                Log.v(TAG, "接收到了事件$t")
            }

            override fun onError(t: Throwable?) {
                Log.w(TAG, "onError: $t")
            }

            override fun onComplete() {
                Log.v(TAG, "onComplete")
            }
        })
}

... ...
2021-07-04 16:02:55.393 16130-16185/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件196
2021-07-04 16:02:55.393 16130-16185/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件197
2021-07-04 16:02:55.393 16130-16185/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件198
2021-07-04 16:02:55.393 16130-16185/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件199
2021-07-04 16:02:55.393 16130-16185/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 发送事件200
... ...
2021-07-04 16:02:57.489 16130-16130/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件126
2021-07-04 16:02:57.489 16130-16130/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件127
2021-07-04 16:02:57.489 16130-16130/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件128
2021-07-04 16:03:25.229 16130-16130/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: 接收到了事件200
2021-07-04 16:03:25.233 16130-16130/com.example.rxjavademo V/MAINACTIVITY_RXJAVA: onComplete

以上是关于Rxjava 整理(未完)的主要内容,如果未能解决你的问题,请参考以下文章

Rxjava 整理(未完)

如何取消订阅RxKotlin / RxJava中的Flowable?

知识整理这可能是最好的RxJava 2.x 入门教程

RxJava2

知识整理这可能是最好的RxJava 2.x 入门教程

知识整理这可能是最好的RxJava 2.x 入门教程