容易被忽略的知识点:RxJava操作符的线程安全
Posted 涂程
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了容易被忽略的知识点:RxJava操作符的线程安全相关的知识,希望对你有一定的参考价值。
好文推荐:
作者:fundroid
随着RxJava的深入使用,渐渐发现了一个令人不安的真相:
"RxJava的大部分操作符并非线程安全的。"
在一些多线程场景下对RxJava的滥用会发生不符合预期的现象。
Rx操作符并非线程安全
很多人对RxJava的定义是一个异步响应式框架,既然是为异步处理而生的框架线程不安全?
是的,支持异步并不意味着支持并发
线程不安全会发生什么问题呢?RxJava中的大部分操作符都是线程不安全的, 当多线程同时像一个stream发射数据时,操作符的结果可能不符合预期。
以一个常用的操作符 take(n) 为例:
@JvmStatic
fun main(args: Array<String>) {
val numberOfThreads = 10
repeat(1000) {
println("Iteration = $it")
val publishSubject = PublishSubject.create<Int>()
val actuallyReceived = AtomicInteger()
publishSubject.take(3).subscribe {
actuallyReceived.incrementAndGet()
}
val latch = CountDownLatch(numberOfThreads)
var threads = listOf<Thread>()
(0..numberOfThreads).forEach {
threads += thread(start = false) {
publishSubject.onNext(it)
latch.countDown()
}
}
threads.forEach { it.start() }
latch.await()
check(actuallyReceived.get() == 3)
}
}
执行上面代码,由于take
的结果不符合预期,总是会异常退出
看一下take的源码:
public final class ObservableTake<T> extends AbstractObservableWithUpstream<T, T> {
final long limit;
public ObservableTake(ObservableSource<T> source, long limit) {
super(source);
this.limit = limit;
}
protected void subscribeActual(Observer<? super T> observer) {
this.source.subscribe(new ObservableTake.TakeObserver(observer, this.limit));
}
static final class TakeObserver<T> implements Observer<T>, Disposable {
final Observer<? super T> downstream;
boolean done;
Disposable upstream;
long remaining;
TakeObserver(Observer<? super T> actual, long limit) {
this.downstream = actual;
this.remaining = limit;
}
public void onNext(T t) {
if (!this.done && this.remaining-- > 0L) {
boolean stop = this.remaining == 0L;
this.downstream.onNext(t);
if (stop) {
this.onComplete();
}
}
}
}
}
果然不出所料,remaining--
没有任何锁操作,无法保证线程安全。
The Observable Contract
Rx在对Observable的定义中已经明确告诉我们了:
Observables must issue notifications to observers serially (not in parallel). They may issue these notifications from different threads, but there must be a formal happens-before relationship between the notifications.
happens-before relationship 需要我们保证进入stream数据的先后顺序,避免并发行为。根据官方的解释,这样做可以避免一些有锁操作带来的性能下降,因此仅在必要的时候才确保线程安全。
操作符的线程安全
那么哪些操作符是线程安全的呢?
RxJava的操作符种类繁多,一个一个记忆很难,基本上可以按照这个原则区分:
- 操作单个Observable的操作符都不是线程不安全的,例如常用的 take(n)、map()、distinctUntilChanged() 等,但是带有scheduler参数的除外,例如 window(…, scheduler)、debounce(…, scheduler) 等
- 操作多个Observable的操作符是线程安全的,例如 merge()、combineLatest()、zip()
用代码描述大概是这种感觉:
fun operatorThreadSafety() = if (operator.worksWithOneObservable() &&
operator.supportsScheduling == false) {
Operator.NOT_THREAD_SAFE_AND_THAT_IS_OK
} else {
Operator.MOST_LIKELY_THREAD_SAFE
}
Subject的线程安全
相对于操作符的线程安全,个人认为Subject的使用更需要大家注意。常用的Subject都不是线程安全的(SerializedSubject除外),而最容易出现并发操作的场景恰恰是Subject,例如我们经常会使用Subject作为中继器,异步onNext向Subject发射数据。前面take的例子便是这种场景。
更要命的是我们常配合observeOn来进行线程切换,而observeOn本身也并非线程安全的,翻看其源码会发现,observeOn在切线程时使用了一个线程不安全的队列
queue = new SpscArrayQueue<Object>(RxRingBuffer.SIZE);
因此,下面的代码在并发环境中必然会发生问题:
@JvmStatic
fun main(args: Array<String>) {
val numberOfThreads = 10000
val publishSubject = PublishSubject.create<Int>()
val actuallyReceived = AtomicInteger()
publishSubject
.observeOn(androidSchedulers.mainThread())
.subscribe {
actuallyReceived.incrementAndGet()
}
val latch = CountDownLatch(numberOfThreads)
var threads = listOf<Thread>()
(0..numberOfThreads).forEach {
threads += thread(start = false) {
publishSubject.onNext(it)
latch.countDown()
}
}
threads.forEach { it.start() }
latch.await()
print("actuallyReceived: $actuallyReceived")
}
由于observeOn切了线程,结果总是会漏掉几个数据
使用SerializedSubject
在并发环境中,使用toSerialized转成SerializedSubject,可以避免上述问题
最后
RxJava处于设计和实现上的考虑,很多操作符以及Subject都不是线程安全的,作为开发者需要尽量遵守The Observable Contract,避免在并发环境下使用,如果出现并发使用时,使用Observable.serialize() 或者 Subject.toSerialized() 保证线程安全。
大家如果还想了解更多Android 相关的更多知识点,可以点进我的GitHub项目中:https://github.com/733gh/GH-Android-Review-master自行查看,里面记录了许多的Android 知识点。最后还请大家点点赞支持下!!!
以上是关于容易被忽略的知识点:RxJava操作符的线程安全的主要内容,如果未能解决你的问题,请参考以下文章