RxJava2:如何在处置订阅者后避免InterruptibleException?
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RxJava2:如何在处置订阅者后避免InterruptibleException?相关的知识,希望对你有一定的参考价值。
我有一个观察对象:
public Observable<List<Conversation>> getConversationListObservable()
return Observable.create(emitter ->
List<Conversation> conversations = networkApi.getConversations();
for (Conversation conversation : conversations)
if (emitter.isDisposed()) return; // this will cause subscription to terminate.
List<User> users = networkApi.getUserList(conversation.getId());
conversation.setUsers(users);
emitter.onNext(conversations);
emitter.onComplete();
);
androidx.lifecycle.ViewModel
类中使用的:
public class ConversationViewModel extends ViewModel
private CompositeDisposable disposables = new CompositeDisposable();
....
public void fetchConversationList()
disposables.add(repository.getConversationListObservable()
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribe(this::setConversations, this::onError));
[当我离开带有Conversation
列表的屏幕时,将放置此可观察的对象,但是我在logcat
类中的RxJavaPlugins.setErrorHandler
中从Application
中发出警告:
W/RxJavaPlugins.setErrorHandler - Undeliverable exception received, not sure what to do: java.lang.InterruptedException
在networkApi.getUserList
通话中。好像当我进行此网络呼叫时,我的订户并没有在网络呼叫开始时就被释放,而在我获得对呼叫的响应时已经被释放。除了从InterruptedException
类中删除此插件之外,是否有其他方法无法在RxJavaPlugins.setErrorHandler
中获得Application
?
P.S。:堆栈跟踪如下:
2019-11-24 23:28:00.152 18051-18343/com.example W/RxJavaPlugins.setErrorHandler - Undeliverable exception received, not sure what to do: java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1036)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1327)
at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:212)
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:222)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:190)
at com.example.client.Client.sendRequest(Client.scala:61)
at com.example.ui.Repository.provideClient(Repository.java:205)
at com.example.ui.Repository.fetchUser(Repository.java:981)
at com.example.ui.Repository.fetchUserWithRole(Repository.java:987)
at com.example.ui.Repository.access$2400(Repository.java:95)
at com.example.ui.Repository$11.lambda$createCall$1$Repository$11(Repository.java:912)
at com.example.ui.-$$Lambda$Repository$11$8ZJhKkqn7hg2E6f5A5NBX1EeUPY.subscribe(lambda)
at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
at io.reactivex.Observable.subscribe(Observable.java:12267)
at io.reactivex.internal.operators.observable.ObservableOnErrorNext.subscribeActual(ObservableOnErrorNext.java:38)
at io.reactivex.Observable.subscribe(Observable.java:12267)
at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96)
at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:578)
at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66)
at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57)
at java.util.concurrent.FutureTask.run(FutureTask.java:237)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)
at java.lang.Thread.run(Thread.java:761)
这是来自真实代码的堆栈跟踪,但是我简化了当前问题的代码。此堆栈跟踪中的Client.sendRequest
对应于简化示例中的networkApi.getUserList
。
[我不确定,但我只看到一个问题,if (emitter.isDisposed()) return;
跳过了for loop
中的代码,但是当放置发射器时,emitter.onNext()
和emitter.onComplete()
仍然执行,您需要包装emitter.onNext()
和emitter.onComplete()
] >
if (!emitter.isDisposed())
emitter.onNext(conversations);
emitter.onComplete();
尽管我认为这个问题需要是另一个stacktrace错误,但您可以尝试。
以上是关于RxJava2:如何在处置订阅者后避免InterruptibleException?的主要内容,如果未能解决你的问题,请参考以下文章
RxJava2.1.0:在不同线程上订阅时未调用 PublishSubject onNext
RxSwift - PublishSubject - 忽略错误并继续订阅(不要处置)