RxJava Subscription 自动取消订阅
Posted skiwnchhw
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RxJava Subscription 自动取消订阅相关的知识,希望对你有一定的参考价值。
在RxJava Observer与Subscriber的关系 一文中,我们提到:
subscribe(mObserver)和subscribe(mSubscriber)执行结果就会有区别:
- subscribe(mSubscriber)这种订阅方式在第二次请求数据时就不会执行了,原因就是第一次onNext后自动取消了订阅;
- subscribe(mObserver)则不出现此问题。
今天我们分析下原因。
先来看看源码。
Subscription
RxJava中有个叫做Subscription的接口,可以用来取消订阅.
public interface Subscription {
/**
* Stops the receipt of notifications on the {@link Subscriber} that was registered when this Subscription
* was received.
* <p>
* This allows unregistering an {@link Subscriber} before it has finished receiving all events (i.e. before
* onCompleted is called).
*/
void unsubscribe();
/**
* Indicates whether this {@code Subscription} is currently unsubscribed.
*
* @return {@code true} if this {@code Subscription} is currently unsubscribed, {@code false} otherwise
*/
boolean isUnsubscribed();
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
从上面可以看到,我们只需要调用unsubscribe就可以取消订阅,那么如何得到一个Subscription对象呢?
其实很简单:
Observable.subscribe()方法可以返回一个Subscription的对象,即我们每次订阅都会返回.
感觉Subscription就像一个订单,你下单了就会生成一个订单,而你也可以用这个订单取消订单.
实战
我先写了以下代码来测试:
Subscription subscription = Observable.just("Hello subscription")
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
}
});
System.out.println(subscription.isUnsubscribed());
subscription.unsubscribe();
System.out.println(subscription.isUnsubscribed());
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
在我想来输出的日志应该是这样的:
Hello subscription
false
true
- 1
- 2
- 3
但是,结果出乎我的意料,我运行之后是这样的:
Hello subscription
true
true
- 1
- 2
- 3
明明我没有取消订阅啊,怎么就true了呢?
接下去我进源码探索了一下发现:
在Observable.subscribe()源码:
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
// validate and proceed
if (subscriber == null) {
throw new IllegalArgumentException("observer can not be null");
}
if (observable.onSubscribe == null) {
throw new IllegalStateException("onSubscribe function can not be null.");
/*
* the subscribe function can also be overridden but generally that‘s not the appropriate approach
* so I won‘t mention that in the exception
*/
}
// new Subscriber so onStart it
subscriber.onStart();
/*
* See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
* to user code from within an Observer"
*/
// if not already wrapped
if (!(subscriber instanceof SafeSubscriber)) {
// assign to `observer` so we return the protected version
subscriber = new SafeSubscriber<T>(subscriber);
}
// The code below is exactly the same an unsafeSubscribe but not used because it would
// add a significant depth to already huge call stacks.
try {
// allow the hook to intercept and/or decorate
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Exceptions.throwIfFatal(e);
// in case the subscriber can‘t listen to exceptions anymore
if (subscriber.isUnsubscribed()) {
RxJavaPluginUtils.handleException(hook.onSubscribeError(e));
} else {
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
subscriber.onError(hook.onSubscribeError(e));
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
// TODO could the hook be the cause of the error in the on error handling.
hook.onSubscribeError(r);
// TODO why aren‘t we throwing the hook‘s return value.
throw r;
}
}
return Subscriptions.unsubscribed();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
关键代码:
if (!(subscriber instanceof SafeSubscriber)) {
// assign to `observer` so we return the protected version
subscriber = new SafeSubscriber<T>(subscriber);
}
- 1
- 2
- 3
- 4
它会把我们传递的subscriber转成SafeSubscriber,SafeSubcriber源码如下:
public class SafeSubscriber<T> extends Subscriber<T> {
private final Subscriber<? super T> actual;
boolean done = false;
public SafeSubscriber(Subscriber<? super T> actual) {
super(actual);
this.actual = actual;
}
/**
* Notifies the Subscriber that the {@code Observable} has finished sending push-based notifications.
* <p>
* The {@code Observable} will not call this method if it calls {@link #onError}.
*/
@Override
public void onCompleted() {
if (!done) {
done = true;
try {
actual.onCompleted();
} catch (Throwable e) {
// we handle here instead of another method so we don‘t add stacks to the frame
// which can prevent it from being able to handle StackOverflow
Exceptions.throwIfFatal(e);
RxJavaPluginUtils.handleException(e);
throw new OnCompletedFailedException(e.getMessage(), e);
} finally {
try {
// Similarly to onError if failure occurs in unsubscribe then Rx contract is broken
// and we throw an UnsubscribeFailureException.
unsubscribe();
} catch (Throwable e) {
RxJavaPluginUtils.handleException(e);
throw new UnsubscribeFailedException(e.getMessage(), e);
}
}
}
}
/**
* Notifies the Subscriber that the {@code Observable} has experienced an error condition.
* <p>
* If the {@code Observable} calls this method, it will not thereafter call {@link #onNext} or
* {@link #onCompleted}.
*
* @param e
* the exception encountered by the Observable
*/
@Override
public void onError(Throwable e) {
// we handle here instead of another method so we don‘t add stacks to the frame
// which can prevent it from being able to handle StackOverflow
Exceptions.throwIfFatal(e);
if (!done) {
done = true;
_onError(e);
}
}
/**
* Provides the Subscriber with a new item to observe.
* <p>
* The {@code Observable} may call this method 0 or more times.
* <p>
* The {@code Observable} will not call this method again after it calls either {@link #onCompleted} or
* {@link #onError}.
*
* @param args
* the item emitted by the Observable
*/
@Override
public void onNext(T args) {
try {
if (!done) {
actual.onNext(args);
}
} catch (Throwable e) {
// we handle here instead of another method so we don‘t add stacks to the frame
// which can prevent it from being able to handle StackOverflow
Exceptions.throwOrReport(e, this);
}
}
/**
* The logic for {@code onError} without the {@code isFinished} check so it can be called from within
* {@code onCompleted}.
*
* @see <a href="https://github.com/ReactiveX/RxJava/issues/630">the report of this bug</a>
*/
protected void _onError(Throwable e) {
RxJavaPluginUtils.handleException(e);
try {
actual.onError(e);
} catch (Throwable e2) {
if (e2 instanceof OnErrorNotImplementedException) {
/*
* onError isn‘t implemented so throw
*
* https://github.com/ReactiveX/RxJava/issues/198
*
* Rx Design Guidelines 5.2
*
* "when calling the Subscribe method that only has an onNext argument, the OnError behavior
* will be to rethrow the exception on the thread that the message comes out from the observable
* sequence. The OnCompleted behavior in this case is to do nothing."
*/
try {
unsubscribe();
} catch (Throwable unsubscribeException) {
RxJavaPluginUtils.handleException(unsubscribeException);
throw new RuntimeException("Observer.onError not implemented and error while unsubscribing.", new CompositeException(Arrays.asList(e, unsubscribeException)));
}
throw (OnErrorNotImplementedException) e2;
} else {
/*
* throw since the Rx contract is broken if onError failed
*
* https://github.com/ReactiveX/RxJava/issues/198
*/
RxJavaPluginUtils.handleException(e2);
try {
unsubscribe();
} catch (Throwable unsubscribeException) {
RxJavaPluginUtils.handleException(unsubscribeException);
throw new OnErrorFailedException("Error occurred when trying to propagate error to Observer.onError and during unsubscription.", new CompositeException(Arrays.asList(e, e2, unsubscribeException)));
}
throw new OnErrorFailedException("Error occurred when trying to propagate error to Observer.onError", new CompositeException(Arrays.asList(e, e2)));
}
}
// if we did not throw above we will unsubscribe here, if onError failed then unsubscribe happens in the catch
try {
unsubscribe();
} catch (RuntimeException unsubscribeException) {
RxJavaPluginUtils.handleException(unsubscribeException);
throw new OnErrorFailedException(unsubscribeException);
}
}
/**
* Returns the {@link Subscriber} underlying this {@code SafeSubscriber}.
*
* @return the {@link Subscriber} that was used to create this {@code SafeSubscriber}
*/
public Subscriber<? super T> getActual() {
return actual;
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
- 116
- 117
- 118
- 119
- 120
- 121
- 122
- 123
- 124
- 125
- 126
- 127
- 128
- 129
- 130
- 131
- 132
- 133
- 134
- 135
- 136
- 137
- 138
- 139
- 140
- 141
- 142
- 143
- 144
- 145
- 146
- 147
- 148
- 149
- 150
- 151
从源码中不能发现:
public void onCompleted() {
if (!done) {
done = true;
try {
actual.onCompleted();
} catch (Throwable e) {
// we handle here instead of another method so we don‘t add stacks to the frame
// which can prevent it from being able to handle StackOverflow
Exceptions.throwIfFatal(e);
// handle errors if the onCompleted implementation fails, not just if the Observable fails
_onError(e);
} finally {
// auto-unsubscribe
unsubscribe();
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
原来它在finally里自动取消了订阅!!
同样,在出现error时,也会自动取消订阅。
protected void _onError(Throwable e) {
RxJavaPluginUtils.handleException(e);
try {
actual.onError(e);
} catch (Throwable e2) {
if (e2 instanceof OnErrorNotImplementedException) {
/*
* onError isn‘t implemented so throw
*
* https://github.com/ReactiveX/RxJava/issues/198
*
* Rx Design Guidelines 5.2
*
* "when calling the Subscribe method that only has an onNext argument, the OnError behavior
* will be to rethrow the exception on the thread that the message comes out from the observable
* sequence. The OnCompleted behavior in this case is to do nothing."
*/
try {
unsubscribe();
} catch (Throwable unsubscribeException) {
RxJavaPluginUtils.handleException(unsubscribeException);
throw new RuntimeException("Observer.onError not implemented and error while unsubscribing.", new CompositeException(Arrays.asList(e, unsubscribeException)));
}
throw (OnErrorNotImplementedException) e2;
} else {
/*
* throw since the Rx contract is broken if onError failed
*
* https://github.com/ReactiveX/RxJava/issues/198
*/
RxJavaPluginUtils.handleException(e2);
try {
unsubscribe();
} catch (Throwable unsubscribeException) {
RxJavaPluginUtils.handleException(unsubscribeException);
throw new OnErrorFailedException("Error occurred when trying to propagate error to Observer.onError and during unsubscription.", new CompositeException(Arrays.asList(e, e2, unsubscribeException)));
}
throw new OnErrorFailedException("Error occurred when trying to propagate error to Observer.onError", new CompositeException(Arrays.asList(e, e2)));
}
}
// if we did not throw above we will unsubscribe here, if onError failed then unsubscribe happens in the catch
try {
unsubscribe();
} catch (RuntimeException unsubscribeException) {
RxJavaPluginUtils.handleException(unsubscribeException);
throw new OnErrorFailedException(unsubscribeException);
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
本文参考了http://www.jianshu.com/p/c26f96c0ab81
困扰我了好久的问题终于解决了,呜呼!
可以结合下面文章来看:
http://blog.csdn.net/jdsjlzx/article/details/51534504
再分享一下我老师大神的人工智能教程吧。零基础!通俗易懂!风趣幽默!还带黄段子!希望你也加入到我们人工智能的队伍中来!http://www.captainbed.net
!-->以上是关于RxJava Subscription 自动取消订阅的主要内容,如果未能解决你的问题,请参考以下文章