RxJava2:如何在处置订阅者后避免InterruptibleException?

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RxJava2:如何在处置订阅者后避免InterruptibleException?相关的知识,希望对你有一定的参考价值。

我有一个观察对象:

public Observable<List<Conversation>> getConversationListObservable() 
    return Observable.create(emitter -> 
        List<Conversation> conversations = networkApi.getConversations();
        for (Conversation conversation : conversations) 
            if (emitter.isDisposed()) return;  // this will cause subscription to terminate.
            List<User> users = networkApi.getUserList(conversation.getId());
            conversation.setUsers(users);
        
        emitter.onNext(conversations);
        emitter.onComplete();
    );

androidx.lifecycle.ViewModel类中使用的:

public class ConversationViewModel extends ViewModel 
    private CompositeDisposable disposables = new CompositeDisposable();
    ....
    public void fetchConversationList()
        disposables.add(repository.getConversationListObservable()
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeOn(Schedulers.io())
                .subscribe(this::setConversations, this::onError));
    

[当我离开带有Conversation列表的屏幕时,将放置此可观察的对象,但是我在logcat类中的RxJavaPlugins.setErrorHandler中从Application中发出警告:

W/RxJavaPlugins.setErrorHandler - Undeliverable exception received, not sure what to do: java.lang.InterruptedException

networkApi.getUserList通话中。好像当我进行此网络呼叫时,我的订户并没有在网络呼叫开始时就被释放,而在我获得对呼叫的响应时已经被释放。除了从InterruptedException类中删除此插件之外,是否有其他方法无法在RxJavaPlugins.setErrorHandler中获得Application

P.S。:堆栈跟踪如下:

2019-11-24 23:28:00.152 18051-18343/com.example W/RxJavaPlugins.setErrorHandler - Undeliverable exception received, not sure what to do: java.lang.InterruptedException
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1036)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1327)
    at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:212)
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:222)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227)
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
    at scala.concurrent.Await$.result(package.scala:190)
    at com.example.client.Client.sendRequest(Client.scala:61)
    at com.example.ui.Repository.provideClient(Repository.java:205)
    at com.example.ui.Repository.fetchUser(Repository.java:981)
    at com.example.ui.Repository.fetchUserWithRole(Repository.java:987)
    at com.example.ui.Repository.access$2400(Repository.java:95)
    at com.example.ui.Repository$11.lambda$createCall$1$Repository$11(Repository.java:912)
    at com.example.ui.-$$Lambda$Repository$11$8ZJhKkqn7hg2E6f5A5NBX1EeUPY.subscribe(lambda)
    at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
    at io.reactivex.Observable.subscribe(Observable.java:12267)
    at io.reactivex.internal.operators.observable.ObservableOnErrorNext.subscribeActual(ObservableOnErrorNext.java:38)
    at io.reactivex.Observable.subscribe(Observable.java:12267)
    at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96)
    at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:578)
    at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66)
    at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57)
    at java.util.concurrent.FutureTask.run(FutureTask.java:237)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)
    at java.lang.Thread.run(Thread.java:761)

这是来自真实代码的堆栈跟踪,但是我简化了当前问题的代码。此堆栈跟踪中的Client.sendRequest对应于简化示例中的networkApi.getUserList

答案

[我不确定,但我只看到一个问题,if (emitter.isDisposed()) return;跳过了for loop中的代码,但是当放置发射器时,emitter.onNext()emitter.onComplete()仍然执行,您需要包装emitter.onNext()emitter.onComplete()] >

if (!emitter.isDisposed())
        emitter.onNext(conversations);
        emitter.onComplete();

尽管我认为这个问题需要是另一个stacktrace错误,但您可以尝试。

以上是关于RxJava2:如何在处置订阅者后避免InterruptibleException?的主要内容,如果未能解决你的问题,请参考以下文章

RxJava2.1.0:在不同线程上订阅时未调用 PublishSubject onNext

在 RxJava2(Android) 中订阅 Vs 订阅?

RXJava2 管理订阅

RxSwift - PublishSubject - 忽略错误并继续订阅(不要处置)

Rxjava2 可连接的Observable(ConnectableObservable)操作详解及实例

RxJava2 中多种取消订阅 dispose 的方法梳理( 源码分析 )