在Android上调试时RxJava缓存线程中的InterruptedException

Posted

技术标签:

【中文标题】在Android上调试时RxJava缓存线程中的InterruptedException【英文标题】:InterruptedException in RxJava cache thread when debugging on Android 【发布时间】:2016-04-25 02:46:12 【问题描述】:

有时当我调试我的应用程序时,我会在 RxCachedThreadScheduler-1 中遇到 InterruptedException。这是跟踪:

Fatal Exception: java.lang.InterruptedException
       at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:1991)
       at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2025)
       at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1048)
       at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:776)
       at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1035)

我有一个自定义视图,我在其中订阅我的 observable,如下所示:

@Override
protected void onAttachedToWindow() 
    super.onAttachedToWindow();

    sub = FollowHandler.getInstance().getObservable()
            .filter(new Func1<FollowEvent, Boolean>() 
                @Override
                public Boolean call(FollowEvent followEvent) 
                    if(followEvent == null || followEvent.user == null
                            || user == null)
                        return false;

                    return followEvent.user.id == user.id;
                
            )
            .observeOn(androidSchedulers.mainThread())
            .subscribe(new Observer<FollowEvent>() 
                @Override
                public void onCompleted() 

                @Override
                public void onError(Throwable e) 

                @Override
                public void onNext(FollowEvent followEvent) 
                    reactToThisNiceEvent(followEvent);
                
            );


@Override
protected void onDetachedFromWindow() 
    super.onDetachedFromWindow();

    if(sub != null)
        sub.unsubscribe();

这是可观察的:

eventSubject.asObservable()
        .observeOn(Schedulers.io())
        .doOnNext(new Action1<FollowEvent>() 
            @Override
            public void call(FollowEvent followEvent) 
                if(followEvent != null)
                    doSomethingNice(followEvent);
            
        )
        .share();

其中 eventSubject 是一个简单的 PublishSubject。 我正在使用 RxAndroid 1.1.0 和 RxJava 1.1.0。

有人知道为什么会这样吗?

【问题讨论】:

【参考方案1】:

我不确定为什么会这样,但请尝试这样做:

sub = FollowHandler.getInstance().getObservable()
            .filter(...)
            .subscribeOn(Schedulers.io())    //  <<<<<<<<<<
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(...);

另外,我认为你不需要share()

eventSubject.asObservable()
        .doOnNext(...)
        .subscribeOn(Schedulers.io())   // <<<<< subscribeOn instead of observeOn, but actually, you don't need it here...
        .share();     // <<<<< remove it

如上所述,我经常使用Subject 作为事件总线。而且我从来没有遇到过这样的问题。

附: 如果您检查订阅是否取消订阅,onDetachedFromWindow() 会更好。我知道这个方法在主线程中调用,并发访问这个Subscription是不可能的,但我认为它的风格很好:

if(sub != null && !sub.isUnsubscribed())
        sub.unsubscribe();

【讨论】:

感谢您的回答。我会按照您的建议尝试执行 subscribeOn 和 observeOn 。但是,我可能需要一段时间来测试它。关于份额,我确实需要它。因为共享之前的任务必须在所有订阅者之间共享(并且所有订阅者的结果都是相同的)。如果我删除共享,将为所有订阅者调用 doOnNext(这是毫无意义的)。并感谢您对 isUnsubscribed() 检查的更正。我完全错过了。

以上是关于在Android上调试时RxJava缓存线程中的InterruptedException的主要内容,如果未能解决你的问题,请参考以下文章

Android异步框架RxJava 1.x系列 - 事件及事件序列转换原理

Android实战——RxJava2解锁图片三级缓存框架

RxJava unsubscribeOn,递归调用不在正确的线程中

Android RxJava:你必须了解的线程控制(含实例讲解)

Android函数响应式编程最新RxJava-线程控制

Carson带你学Android:RxJava线程控制(含实例讲解)