RxJava Subscription 自动取消订阅

Posted skiwnchhw

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RxJava Subscription 自动取消订阅相关的知识,希望对你有一定的参考价值。

RxJava Observer与Subscriber的关系 一文中,我们提到:

subscribe(mObserver)和subscribe(mSubscriber)执行结果就会有区别:

  • subscribe(mSubscriber)这种订阅方式在第二次请求数据时就不会执行了,原因就是第一次onNext后自动取消了订阅;
  • subscribe(mObserver)则不出现此问题。

今天我们分析下原因。

先来看看源码。

Subscription
RxJava中有个叫做Subscription的接口,可以用来取消订阅.

public interface Subscription {
    /**
     * Stops the receipt of notifications on the {@link Subscriber} that was registered when this Subscription
     * was received.
     * <p>
     * This allows unregistering an {@link Subscriber} before it has finished receiving all events (i.e. before
     * onCompleted is called).
     */
    void unsubscribe();

    /**
     * Indicates whether this {@code Subscription} is currently unsubscribed.
     *
     * @return {@code true} if this {@code Subscription} is currently unsubscribed, {@code false} otherwise
     */
    boolean isUnsubscribed();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

从上面可以看到,我们只需要调用unsubscribe就可以取消订阅,那么如何得到一个Subscription对象呢?

其实很简单:
Observable.subscribe()方法可以返回一个Subscription的对象,即我们每次订阅都会返回.
感觉Subscription就像一个订单,你下单了就会生成一个订单,而你也可以用这个订单取消订单.

实战

我先写了以下代码来测试:

Subscription subscription = Observable.just("Hello subscription")
        .subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                System.out.println(s);
            }
        });
System.out.println(subscription.isUnsubscribed());
subscription.unsubscribe();
System.out.println(subscription.isUnsubscribed());
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

在我想来输出的日志应该是这样的:

Hello subscription
false
true
  • 1
  • 2
  • 3

但是,结果出乎我的意料,我运行之后是这样的:

Hello subscription
true
true
  • 1
  • 2
  • 3

明明我没有取消订阅啊,怎么就true了呢?

接下去我进源码探索了一下发现:
在Observable.subscribe()源码:

public final Subscription subscribe(Subscriber<? super T> subscriber) {
        return Observable.subscribe(subscriber, this);
    }

    static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
     // validate and proceed
        if (subscriber == null) {
            throw new IllegalArgumentException("observer can not be null");
        }
        if (observable.onSubscribe == null) {
            throw new IllegalStateException("onSubscribe function can not be null.");
            /*
             * the subscribe function can also be overridden but generally that‘s not the appropriate approach
             * so I won‘t mention that in the exception
             */
        }

        // new Subscriber so onStart it
        subscriber.onStart();

        /*
         * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
         * to user code from within an Observer"
         */
        // if not already wrapped
        if (!(subscriber instanceof SafeSubscriber)) {
            // assign to `observer` so we return the protected version
            subscriber = new SafeSubscriber<T>(subscriber);
        }

        // The code below is exactly the same an unsafeSubscribe but not used because it would 
        // add a significant depth to already huge call stacks.
        try {
            // allow the hook to intercept and/or decorate
            hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
            return hook.onSubscribeReturn(subscriber);
        } catch (Throwable e) {
            // special handling for certain Throwable/Error/Exception types
            Exceptions.throwIfFatal(e);
            // in case the subscriber can‘t listen to exceptions anymore
            if (subscriber.isUnsubscribed()) {
                RxJavaPluginUtils.handleException(hook.onSubscribeError(e));
            } else {
                // if an unhandled error occurs executing the onSubscribe we will propagate it
                try {
                    subscriber.onError(hook.onSubscribeError(e));
                } catch (Throwable e2) {
                    Exceptions.throwIfFatal(e2);
                    // if this happens it means the onError itself failed (perhaps an invalid function implementation)
                    // so we are unable to propagate the error correctly and will just throw
                    RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                    // TODO could the hook be the cause of the error in the on error handling.
                    hook.onSubscribeError(r);
                    // TODO why aren‘t we throwing the hook‘s return value.
                    throw r;
                }
            }
            return Subscriptions.unsubscribed();
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60

关键代码:

if (!(subscriber instanceof SafeSubscriber)) {
    // assign to `observer` so we return the protected version
    subscriber = new SafeSubscriber<T>(subscriber);
}
  • 1
  • 2
  • 3
  • 4

它会把我们传递的subscriber转成SafeSubscriber,SafeSubcriber源码如下:

public class SafeSubscriber<T> extends Subscriber<T> {

    private final Subscriber<? super T> actual;

    boolean done = false;

    public SafeSubscriber(Subscriber<? super T> actual) {
        super(actual);
        this.actual = actual;
    }

    /**
     * Notifies the Subscriber that the {@code Observable} has finished sending push-based notifications.
     * <p>
     * The {@code Observable} will not call this method if it calls {@link #onError}.
     */
    @Override
    public void onCompleted() {
        if (!done) {
            done = true;
            try {
                actual.onCompleted();
            } catch (Throwable e) {
                // we handle here instead of another method so we don‘t add stacks to the frame
                // which can prevent it from being able to handle StackOverflow
                Exceptions.throwIfFatal(e);
                RxJavaPluginUtils.handleException(e);
                throw new OnCompletedFailedException(e.getMessage(), e);
            } finally {
                try {
                    // Similarly to onError if failure occurs in unsubscribe then Rx contract is broken
                    // and we throw an UnsubscribeFailureException.
                    unsubscribe();
                } catch (Throwable e) {
                    RxJavaPluginUtils.handleException(e);
                    throw new UnsubscribeFailedException(e.getMessage(), e);
                }
            }
        }
    }

    /**
     * Notifies the Subscriber that the {@code Observable} has experienced an error condition.
     * <p>
     * If the {@code Observable} calls this method, it will not thereafter call {@link #onNext} or
     * {@link #onCompleted}.
     * 
     * @param e
     *          the exception encountered by the Observable
     */
    @Override
    public void onError(Throwable e) {
        // we handle here instead of another method so we don‘t add stacks to the frame
        // which can prevent it from being able to handle StackOverflow
        Exceptions.throwIfFatal(e);
        if (!done) {
            done = true;
            _onError(e);
        }
    }

    /**
     * Provides the Subscriber with a new item to observe.
     * <p>
     * The {@code Observable} may call this method 0 or more times.
     * <p>
     * The {@code Observable} will not call this method again after it calls either {@link #onCompleted} or
     * {@link #onError}.
     * 
     * @param args
     *          the item emitted by the Observable
     */
    @Override
    public void onNext(T args) {
        try {
            if (!done) {
                actual.onNext(args);
            }
        } catch (Throwable e) {
            // we handle here instead of another method so we don‘t add stacks to the frame
            // which can prevent it from being able to handle StackOverflow
            Exceptions.throwOrReport(e, this);
        }
    }

    /**
     * The logic for {@code onError} without the {@code isFinished} check so it can be called from within
     * {@code onCompleted}.
     * 
     * @see <a href="https://github.com/ReactiveX/RxJava/issues/630">the report of this bug</a>
     */
    protected void _onError(Throwable e) {
        RxJavaPluginUtils.handleException(e);
        try {
            actual.onError(e);
        } catch (Throwable e2) {
            if (e2 instanceof OnErrorNotImplementedException) {
                /*
                 * onError isn‘t implemented so throw
                 * 
                 * https://github.com/ReactiveX/RxJava/issues/198
                 * 
                 * Rx Design Guidelines 5.2
                 * 
                 * "when calling the Subscribe method that only has an onNext argument, the OnError behavior
                 * will be to rethrow the exception on the thread that the message comes out from the observable
                 * sequence. The OnCompleted behavior in this case is to do nothing."
                 */
                try {
                    unsubscribe();
                } catch (Throwable unsubscribeException) {
                    RxJavaPluginUtils.handleException(unsubscribeException);
                    throw new RuntimeException("Observer.onError not implemented and error while unsubscribing.", new CompositeException(Arrays.asList(e, unsubscribeException)));
                }
                throw (OnErrorNotImplementedException) e2;
            } else {
                /*
                 * throw since the Rx contract is broken if onError failed
                 * 
                 * https://github.com/ReactiveX/RxJava/issues/198
                 */
                RxJavaPluginUtils.handleException(e2);
                try {
                    unsubscribe();
                } catch (Throwable unsubscribeException) {
                    RxJavaPluginUtils.handleException(unsubscribeException);
                    throw new OnErrorFailedException("Error occurred when trying to propagate error to Observer.onError and during unsubscription.", new CompositeException(Arrays.asList(e, e2, unsubscribeException)));
                }

                throw new OnErrorFailedException("Error occurred when trying to propagate error to Observer.onError", new CompositeException(Arrays.asList(e, e2)));
            }
        }
        // if we did not throw above we will unsubscribe here, if onError failed then unsubscribe happens in the catch
        try {
            unsubscribe();
        } catch (RuntimeException unsubscribeException) {
            RxJavaPluginUtils.handleException(unsubscribeException);
            throw new OnErrorFailedException(unsubscribeException);
        }
    }

    /**
     * Returns the {@link Subscriber} underlying this {@code SafeSubscriber}.
     *
     * @return the {@link Subscriber} that was used to create this {@code SafeSubscriber}
     */
    public Subscriber<? super T> getActual() {
        return actual;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151

从源码中不能发现:

public void onCompleted() {
    if (!done) {
        done = true;
        try {
            actual.onCompleted();
        } catch (Throwable e) {
            // we handle here instead of another method so we don‘t add stacks to the frame
            // which can prevent it from being able to handle StackOverflow
            Exceptions.throwIfFatal(e);
            // handle errors if the onCompleted implementation fails, not just if the Observable fails
            _onError(e);
        } finally {
            // auto-unsubscribe
            unsubscribe();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

原来它在finally里自动取消了订阅!!

同样,在出现error时,也会自动取消订阅。

protected void _onError(Throwable e) {
        RxJavaPluginUtils.handleException(e);
        try {
            actual.onError(e);
        } catch (Throwable e2) {
            if (e2 instanceof OnErrorNotImplementedException) {
                /*
                 * onError isn‘t implemented so throw
                 * 
                 * https://github.com/ReactiveX/RxJava/issues/198
                 * 
                 * Rx Design Guidelines 5.2
                 * 
                 * "when calling the Subscribe method that only has an onNext argument, the OnError behavior
                 * will be to rethrow the exception on the thread that the message comes out from the observable
                 * sequence. The OnCompleted behavior in this case is to do nothing."
                 */
                try {
                    unsubscribe();
                } catch (Throwable unsubscribeException) {
                    RxJavaPluginUtils.handleException(unsubscribeException);
                    throw new RuntimeException("Observer.onError not implemented and error while unsubscribing.", new CompositeException(Arrays.asList(e, unsubscribeException)));
                }
                throw (OnErrorNotImplementedException) e2;
            } else {
                /*
                 * throw since the Rx contract is broken if onError failed
                 * 
                 * https://github.com/ReactiveX/RxJava/issues/198
                 */
                RxJavaPluginUtils.handleException(e2);
                try {
                    unsubscribe();
                } catch (Throwable unsubscribeException) {
                    RxJavaPluginUtils.handleException(unsubscribeException);
                    throw new OnErrorFailedException("Error occurred when trying to propagate error to Observer.onError and during unsubscription.", new CompositeException(Arrays.asList(e, e2, unsubscribeException)));
                }

                throw new OnErrorFailedException("Error occurred when trying to propagate error to Observer.onError", new CompositeException(Arrays.asList(e, e2)));
            }
        }
        // if we did not throw above we will unsubscribe here, if onError failed then unsubscribe happens in the catch
        try {
            unsubscribe();
        } catch (RuntimeException unsubscribeException) {
            RxJavaPluginUtils.handleException(unsubscribeException);
            throw new OnErrorFailedException(unsubscribeException);
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49

本文参考了http://www.jianshu.com/p/c26f96c0ab81

困扰我了好久的问题终于解决了,呜呼!

可以结合下面文章来看:
http://blog.csdn.net/jdsjlzx/article/details/51534504

再分享一下我老师大神的人工智能教程吧。零基础!通俗易懂!风趣幽默!还带黄段子!希望你也加入到我们人工智能的队伍中来!http://www.captainbed.net

以上是关于RxJava Subscription 自动取消订阅的主要内容,如果未能解决你的问题,请参考以下文章

RxJava switchMap 不会取消之前的请求

如何确保我取消订阅?

零碎知识点

RxJava2 中多种取消订阅 dispose 的方法梳理( 源码分析 )

使用rxjava时如何取消慢发射

RxJava2 Observable中的异步任务状态&&取消等效项?