在Android上调试时RxJava缓存线程中的InterruptedException
Posted
技术标签:
【中文标题】在Android上调试时RxJava缓存线程中的InterruptedException【英文标题】:InterruptedException in RxJava cache thread when debugging on Android 【发布时间】:2016-04-25 02:46:12 【问题描述】:有时当我调试我的应用程序时,我会在 RxCachedThreadScheduler-1 中遇到 InterruptedException。这是跟踪:
Fatal Exception: java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:1991)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2025)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1048)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:776)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1035)
我有一个自定义视图,我在其中订阅我的 observable,如下所示:
@Override
protected void onAttachedToWindow()
super.onAttachedToWindow();
sub = FollowHandler.getInstance().getObservable()
.filter(new Func1<FollowEvent, Boolean>()
@Override
public Boolean call(FollowEvent followEvent)
if(followEvent == null || followEvent.user == null
|| user == null)
return false;
return followEvent.user.id == user.id;
)
.observeOn(androidSchedulers.mainThread())
.subscribe(new Observer<FollowEvent>()
@Override
public void onCompleted()
@Override
public void onError(Throwable e)
@Override
public void onNext(FollowEvent followEvent)
reactToThisNiceEvent(followEvent);
);
@Override
protected void onDetachedFromWindow()
super.onDetachedFromWindow();
if(sub != null)
sub.unsubscribe();
这是可观察的:
eventSubject.asObservable()
.observeOn(Schedulers.io())
.doOnNext(new Action1<FollowEvent>()
@Override
public void call(FollowEvent followEvent)
if(followEvent != null)
doSomethingNice(followEvent);
)
.share();
其中 eventSubject 是一个简单的 PublishSubject。 我正在使用 RxAndroid 1.1.0 和 RxJava 1.1.0。
有人知道为什么会这样吗?
【问题讨论】:
【参考方案1】:我不确定为什么会这样,但请尝试这样做:
sub = FollowHandler.getInstance().getObservable()
.filter(...)
.subscribeOn(Schedulers.io()) // <<<<<<<<<<
.observeOn(AndroidSchedulers.mainThread())
.subscribe(...);
另外,我认为你不需要share()
:
eventSubject.asObservable()
.doOnNext(...)
.subscribeOn(Schedulers.io()) // <<<<< subscribeOn instead of observeOn, but actually, you don't need it here...
.share(); // <<<<< remove it
如上所述,我经常使用Subject
作为事件总线。而且我从来没有遇到过这样的问题。
附:
如果您检查订阅是否取消订阅,onDetachedFromWindow()
会更好。我知道这个方法在主线程中调用,并发访问这个Subscription
是不可能的,但我认为它的风格很好:
if(sub != null && !sub.isUnsubscribed())
sub.unsubscribe();
【讨论】:
感谢您的回答。我会按照您的建议尝试执行 subscribeOn 和 observeOn 。但是,我可能需要一段时间来测试它。关于份额,我确实需要它。因为共享之前的任务必须在所有订阅者之间共享(并且所有订阅者的结果都是相同的)。如果我删除共享,将为所有订阅者调用 doOnNext(这是毫无意义的)。并感谢您对 isUnsubscribed() 检查的更正。我完全错过了。以上是关于在Android上调试时RxJava缓存线程中的InterruptedException的主要内容,如果未能解决你的问题,请参考以下文章
Android异步框架RxJava 1.x系列 - 事件及事件序列转换原理
RxJava unsubscribeOn,递归调用不在正确的线程中