容易被忽略的知识点:RxJava操作符的线程安全

Posted 涂程

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了容易被忽略的知识点:RxJava操作符的线程安全相关的知识,希望对你有一定的参考价值。

好文推荐:
作者:fundroid

随着RxJava的深入使用,渐渐发现了一个令人不安的真相:

"RxJava的大部分操作符并非线程安全的。"

在一些多线程场景下对RxJava的滥用会发生不符合预期的现象。

Rx操作符并非线程安全

很多人对RxJava的定义是一个异步响应式框架,既然是为异步处理而生的框架线程不安全?

是的,支持异步并不意味着支持并发

线程不安全会发生什么问题呢?RxJava中的大部分操作符都是线程不安全的, 当多线程同时像一个stream发射数据时,操作符的结果可能不符合预期。

以一个常用的操作符 take(n) 为例:

@JvmStatic
fun main(args: Array<String>) {
   val numberOfThreads = 10
   repeat(1000) {
       println("Iteration = $it")

       val publishSubject = PublishSubject.create<Int>() 

       val actuallyReceived = AtomicInteger()

       publishSubject.take(3).subscribe { 
       		actuallyReceived.incrementAndGet() 
       }

       val latch = CountDownLatch(numberOfThreads)
       var threads = listOf<Thread>()

       (0..numberOfThreads).forEach {
            threads += thread(start = false) {
                publishSubject.onNext(it)
                latch.countDown()
            }
        }

        threads.forEach { it.start() }
        latch.await()

        check(actuallyReceived.get() == 3)
    }
}

执行上面代码,由于take的结果不符合预期,总是会异常退出

看一下take的源码:

public final class ObservableTake<T> extends AbstractObservableWithUpstream<T, T> {
    final long limit;

    public ObservableTake(ObservableSource<T> source, long limit) {
        super(source);
        this.limit = limit;
    }
    protected void subscribeActual(Observer<? super T> observer) {
        this.source.subscribe(new ObservableTake.TakeObserver(observer, this.limit));
    }

    static final class TakeObserver<T> implements Observer<T>, Disposable {
        final Observer<? super T> downstream;
        boolean done;
        Disposable upstream;
        long remaining;

        TakeObserver(Observer<? super T> actual, long limit) {
            this.downstream = actual;
            this.remaining = limit;
        }

        public void onNext(T t) {
            if (!this.done && this.remaining-- > 0L) {
                boolean stop = this.remaining == 0L;
                this.downstream.onNext(t);
                if (stop) {
                    this.onComplete();
                }
            }

        }
    }
}

果然不出所料,remaining--没有任何锁操作,无法保证线程安全。

The Observable Contract

Rx在对Observable的定义中已经明确告诉我们了:

Observables must issue notifications to observers serially (not in parallel). They may issue these notifications from different threads, but there must be a formal happens-before relationship between the notifications.

reactivex.io/documentati…

happens-before relationship 需要我们保证进入stream数据的先后顺序,避免并发行为。根据官方的解释,这样做可以避免一些有锁操作带来的性能下降,因此仅在必要的时候才确保线程安全。

操作符的线程安全

那么哪些操作符是线程安全的呢?

RxJava的操作符种类繁多,一个一个记忆很难,基本上可以按照这个原则区分:

  • 操作单个Observable的操作符都不是线程不安全的,例如常用的 take(n)map()distinctUntilChanged() 等,但是带有scheduler参数的除外,例如 window(…, scheduler)debounce(…, scheduler)
  • 操作多个Observable的操作符是线程安全的,例如 merge()combineLatest()zip()

用代码描述大概是这种感觉:

fun operatorThreadSafety() = if (operator.worksWithOneObservable() &&  
    operator.supportsScheduling == false) {
    Operator.NOT_THREAD_SAFE_AND_THAT_IS_OK
} else {
    Operator.MOST_LIKELY_THREAD_SAFE
}

Subject的线程安全

相对于操作符的线程安全,个人认为Subject的使用更需要大家注意。常用的Subject都不是线程安全的(SerializedSubject除外),而最容易出现并发操作的场景恰恰是Subject,例如我们经常会使用Subject作为中继器,异步onNext向Subject发射数据。前面take的例子便是这种场景。

更要命的是我们常配合observeOn来进行线程切换,而observeOn本身也并非线程安全的,翻看其源码会发现,observeOn在切线程时使用了一个线程不安全的队列

queue = new SpscArrayQueue<Object>(RxRingBuffer.SIZE);

因此,下面的代码在并发环境中必然会发生问题:

@JvmStatic
fun main(args: Array<String>) {
    val numberOfThreads = 10000

 	  val publishSubject = PublishSubject.create<Int>()
    val actuallyReceived = AtomicInteger()

    publishSubject
        .observeOn(androidSchedulers.mainThread())
        .subscribe {
            actuallyReceived.incrementAndGet()
        }

    val latch = CountDownLatch(numberOfThreads)
    var threads = listOf<Thread>()

    (0..numberOfThreads).forEach {
        threads += thread(start = false) {
            publishSubject.onNext(it)
            latch.countDown()
        }
    }

    threads.forEach { it.start() }
    latch.await()

    print("actuallyReceived: $actuallyReceived")

}

由于observeOn切了线程,结果总是会漏掉几个数据

使用SerializedSubject

在并发环境中,使用toSerialized转成SerializedSubject,可以避免上述问题

最后

RxJava处于设计和实现上的考虑,很多操作符以及Subject都不是线程安全的,作为开发者需要尽量遵守The Observable Contract,避免在并发环境下使用,如果出现并发使用时,使用Observable.serialize() 或者 Subject.toSerialized() 保证线程安全。

大家如果还想了解更多Android 相关的更多知识点,可以点进我的GitHub项目中:https://github.com/733gh/GH-Android-Review-master自行查看,里面记录了许多的Android 知识点。最后还请大家点点赞支持下!!!

以上是关于容易被忽略的知识点:RxJava操作符的线程安全的主要内容,如果未能解决你的问题,请参考以下文章

结构体的一些容易被忽略的重要知识

RxJava2线程切换原理分析

Rxjava - 操作符,线程操作的简单使用

RxJava 线程模型分析

Javascript知识汇总------js中容易被忽略的细节(持续更新)

Sqlserver中一直在用又经常被忽略的知识点一