浅谈RxJava与多线程并发

Posted 栀夏暖阳

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了浅谈RxJava与多线程并发相关的知识,希望对你有一定的参考价值。

认识RxJava已经有一段时间了,但是一直没有机会在项目中尝试,最近同事在新的项目里引进了RxJava写一些事件处理,在review代码的时候发现了一些和多线程并发相关的问题,所以写了这篇文章。

前言

对于RxJava,大家应该都很熟悉,他最核心的两个字就是异步,诚然,它对异步的处理非常的出色,但是异步绝对不等于并发,更不等于线程安全,如果把这几个概念搞混了,错误的使用RxJava,是会来带非常多的问题的。

RxJava与并发

首先让我们来看一段RxJava协议的原文:

1
Observables must issue notifications to observers serially (not in parallel). They may issue these notifications from different threads, but there must be a formal happens-before relationship between the notifications.

如上所述,RxJava对多线程并发其实并没有做非常的多保护,这段话中说,如果多个Observables从多个线程中发射数据,必须要满足happens-before原则。

下面来看一个简单的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
final PublishSubject<Integer> subject = PublishSubject.create();

subject.subscribe(new Subscriber<Integer>() 
    @Override
    public void onCompleted() 

    

    @Override
    public void onError(Throwable e) 

    

    @Override
    public void onNext(Integer integer) 
        unSafeCount = unSafeCount + integer;
        Log.d("TAG", "onNext: " + unSafeCount);
    
);

findViewById(R.id.send).setOnClickListener(new View.OnClickListener() 
    @Override
    public void onClick(View v) 
        final int unit = 1;
        for(int i = 0;i < 10;i++) 
            new Thread(new Runnable() 
                @Override
                public void run() 
                    for (int j = 0; j < 1000; j++) 
                        subject.onNext(unit);
                    
                
            ).start();
        
    
);

这是一个最典型的多线程问题,从10个线程中发射数据并相加,这样最终得到的答案是小于10000的。虽然使用了RxJava,但是这样的使用对于并发是没有意义的,因为RxJava并没有去处理并发带来的问题。我们可以看下subject的onNext方法的源码,里面很简单,就是调用了对应observer的onNext方法而已。不止是这样,绝大多数的Subject都是线程不安全的,所以当你在使用这样的类的时候(典型场景就是自制的RxBus),如果从多个线程中发射数据,那你就要小心了。

对于这样的问题,有两种解决方案:

第一种就是简单的使用传统的解决方法,比如用AtomicInteger代替int。

第二种则是使用RxJava的解决方案,在这里就是用SerializedSubject去代替Subject:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
final PublishSubject<Integer> subject = PublishSubject.create();

subject.subscribe(new Subscriber<Integer>() 
    @Override
    public void onCompleted() 

    

    @Override
    public void onError(Throwable e) 

    

    @Override
    public void onNext(Integer integer) 
        unSafeCount = unSafeCount + integer;
        count.addAndGet(integer);

        Log.d("TAG", "onNext: " + count);
    
);

final SerializedSubject<Integer, Integer> ser = new SerializedSubject<Integer, Integer>(subject);

findViewById(R.id.send).setOnClickListener(new View.OnClickListener() 
    @Override
    public void onClick(View v) 
        final int unit = 1;

        for(int i = 0;i < 10;i++)
            new Thread(new Runnable() 
                @Override
                public void run() 
                    for(int j = 0;j < 1000;j++)
                        ser.onNext(unit);
                    
                
            ).start();
        
    
);

可以看一下SerializedSubject的onNext方法做了什么:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
@Override
public void onNext(T t) 
    if (terminated) 
        return;
    
    synchronized (this) 
        if (terminated) 
            return;
        
        if (emitting) 
            FastList list = queue;
            if (list == null) 
                list = new FastList();
                queue = list;
            
            list.add(nl.next(t));
            return;
        
        emitting = true;
    
    try 
        actual.onNext(t);
     catch (Throwable e) 
        terminated = true;
        Exceptions.throwOrReport(e, actual, t);
        return;
    
    for (;;) 
        for (int i = 0; i < MAX_DRAIN_ITERATION; i++) 
            FastList list;
            synchronized (this) 
                list = queue;
                if (list == null) 
                    emitting = false;
                    return;
                
                queue = null;
            
            for (Object o : list.array) 
                if (o == null) 
                    break;
                
                try 
                    if (nl.accept(actual, o)) 
                        terminated = true;
                        return;
                    
                 catch (Throwable e) 
                    terminated = true;
                    Exceptions.throwIfFatal(e);
                    actual.onError(OnErrorThrowable.addValueAsLastCause(e, t));
                    return;
                
            
        
    

处理方式很简单,如果有其他线程在发射数据,那就将数据放置到队列中,等待下次发射。这保证了同一时间只会有一个线程调用onNext,onComplete和onError这些方法。

但是这样操作显然是会造成性能的影响的,所以RxJava并不会把所有的操作都打上线程安全的标签。

在这里就要引申出一个问题,那就是使用者对create方法的滥用,其实这个方法不应该被使用者频繁的调用的,因为你必须要小心的处理所有的数据发射,接收的逻辑。相反的,使用已有的操作符能很好的解决这个问题,所以下次大家在遇到问题的时候不要简单的使用create去自己写,而是应该想想有没有现成的操作符可以完成相应的需求。

RxJava中的一些操作符

RxJava中有一些操作符也和多线程并发有关,下面让我来讲一讲merge和concat,以及他们的一些变种操作符。

对于多线程发射数据,有时候我们需要得到的结果也保持和发射时候一样的顺序,这个时候如果我们使用merge这个操作符去结合多个发射源,那么就会产生一定的问题了(例子中做了非常不好的示范——使用了create操作符,请大家不要学习这样的写法,这里单纯是为了求证结果)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
Observable o1 = Observable.create(new Observable.OnSubscribe<Integer>() 
    @Override
    public void call(final Subscriber<? super Integer> subscriber) 
        new Thread(new Runnable() 
            @Override
            public void run() 
                try 
                    Thread.sleep(1000);
                    subscriber.onNext(1);
                    subscriber.onCompleted();
                 catch (InterruptedException e) 
                    e.printStackTrace();
                
            
        ).start();
    
);
Observable o2 = Observable.create(new Observable.OnSubscribe<Integer>() 
    @Override
    public void call(Subscriber<? super Integer> subscriber) 
        subscriber.onNext(2);
        subscriber.onCompleted();
    
);

Observable.merge(o1,o2)
        .subscribe(new Subscriber<Integer>() 
            @Override
            public void onCompleted() 

            

            @Override
            public void onError(Throwable e) 

            

            @Override
            public void onNext(Integer i) 
                Log.d("TAG", "onNext: " + i);
            
        );

对于这样的场景,我们得到的答案将是2,1而不是先得到o1发射的数据,再获取o2的数据。

究其原因,就是因为merge其实就是给什么传什么,也不会去管数据发射的顺序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Override
public void onNext(Observable<? extends T> t) 
    if (t == null) 
        return;
    
    if (t == Observable.empty()) 
        emitEmpty();
     else
    if (t instanceof ScalarSynchronousObservable) 
        tryEmit(((ScalarSynchronousObservable<? extends T>)t).get());
     else 
        InnerSubscriber<T> inner = new InnerSubscriber<T>(this, uniqueId++);
        addInner(inner);
        t.unsafeSubscribe(inner);
        emit();
    

可以看到在经过lift操作之后,对应的中间人MergeSubscriber的onNext,没有什么多余的代码,所以在多个Observable从多线程中发射数据的时候,顺序当然不能得到保证。

一个单词说明这个问题:interleaving——交错。merge后的数据源可能是交错的。由于merge有这样数据交错的问题,所以它的变种—flatMap也会有同样的问题。

对于这样的场景,我们可以使用concat操作符来完成:

1
Concat waits to subscribe to each additional Observable that you pass to it until the previous Observable completes.

根据文档,我们知道concat操作符是一个接一个的处理数据源的数据的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
if (wip.getAndIncrement() != 0) 
    return;


final int delayErrorMode = this.delayErrorMode;

for (;;) 
    if (actual.isUnsubscribed()) 
        return;
    

    if (!active) 
        if (delayErrorMode == BOUNDARY) 
            if (error.get() != null) 
                Throwable ex = ExceptionsUtils.terminate(error);
                if (!ExceptionsUtils.isTerminated(ex)) 
                    actual.onError(ex);
                
                return;
            
        

        boolean mainDone = done;
        Object v = queue.poll();
        boolean empty = v == null;

        if (mainDone && empty) 
            Throwable ex = ExceptionsUtils.terminate(error);
            if (ex == null) 
                actual.onCompleted();
             else
            if (!ExceptionsUtils.isTerminated(ex)) 
                actual.onError(ex);
            
            return;
        

        if (!empty) 

            Observable<? extends R> source;

            try 
                source = mapper.call(NotificationLite.<T>instance().getValue(v));
             catch (Throwable mapperError) 
                Exceptions.throwIfFatal(mapperError);
                drainError(mapperError);
                return;
            

            if (source == null) 
                drainError(new NullPointerException("The source returned by the mapper was null"));
                return;
            

            if (source != Observable.empty()) 

                if (source instanceof ScalarSynchronousObservable) 
                    ScalarSynchronousObservable<? extends R> scalarSource = (ScalarSynchronousObservable<? extends R>) source;

                    active = true;

                    arbiter.setProducer(new ConcatMapInnerScalarProducer<T, R>(scalarSource.get(), this));
                 else 
                    ConcatMapInnerSubscriber<T, R> innerSubscriber = new ConcatMapInnerSubscriber<T, R>(this);
                    inner.set(innerSubscriber);

                    if (!innerSubscriber.isUnsubscribed()) 
                        active = true;

                        source.unsafeSubscribe(innerSubscriber);
                     else 

以上是关于浅谈RxJava与多线程并发的主要内容,如果未能解决你的问题,请参考以下文章

浅谈RxJava源码解析(观察者),创建(createfromjust),变换(MapflatMap)线程调度

C++并发与多线程 4_创建多个线程数据共享问题分析

python-学习-python并发编程之多进程与多线程

并发与多线程相关知识点梳理

并发与多线程相关知识点梳理

并发与多线程相关知识点梳理

(c)2006-2024 SYSTEM All Rights Reserved IT常识