RxJava2 UndeliverableException 在获取数据时发生方向变化

Posted

技术标签:

【中文标题】RxJava2 UndeliverableException 在获取数据时发生方向变化【英文标题】:RxJava2 UndeliverableException when orientation change is happening while fetching data 【发布时间】:2019-03-08 22:51:11 【问题描述】:

如果我在应用程序获取新的 redditNews 时更改设备的方向,则会收到以下错误。

  E/androidRuntime: FATAL EXCEPTION: RxCachedThreadScheduler-1
    Process: com.spicywdev.schmeddit, PID: 26522
    io.reactivex.exceptions.UndeliverableException: The exception could not be delivered to the consumer because it has already canceled/disposed the flow or the exception has nowhere to go to begin with. Further reading: https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling | null
        at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:367)
        at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:73)
        at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:43)
        at io.reactivex.Observable.subscribe(Observable.java:12090)
        at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96)
        at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:578)
        at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66)
        at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57)
        at java.util.concurrent.FutureTask.run(FutureTask.java:237)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)
        at java.lang.Thread.run(Thread.java:762)
     Caused by: java.io.InterruptedIOException
        at okhttp3.internal.http2.Http2Stream.waitForIo(Http2Stream.java:579)
        at okhttp3.internal.http2.Http2Stream.takeResponseHeaders(Http2Stream.java:143)
        at okhttp3.internal.http2.Http2Codec.readResponseHeaders(Http2Codec.java:125)
        at okhttp3.internal.http.CallServerInterceptor.intercept(CallServerInterceptor.java:88)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
        at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:45)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
        at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
        at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
        at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:126)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
        at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:200)
        at okhttp3.RealCall.execute(RealCall.java:77)
        at retrofit2.OkHttpCall.execute(OkHttpCall.java:180)
        at retrofit2.ExecutorCallAdapterFactory$ExecutorCallbackCall.execute(ExecutorCallAdapterFactory.java:91)
        at com.spicywdev.schmeddit.features.news.NewsManager$getNews$1.subscribe(NewsManager.kt:16)
        at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)

这就是类的样子。函数requestNews() 发生错误。

class NewsFragment : RxBaseFragment() 

    companion object 
        private val KEY_REDDIT_NEWS = "redditNews"
    

    private var redditNews: RedditNews? = null
    private val newsManager by lazy  NewsManager() 

    override fun onCreateView(inflater: LayoutInflater, container: ViewGroup?, savedInstanceState: Bundle?): View? 
        return container?.inflate(R.layout.news_fragment)
    

    override fun onActivityCreated(savedInstanceState: Bundle?) 
        super.onActivityCreated(savedInstanceState)

        news_list.apply 
            setHasFixedSize(true)
            val linearLayoutManager = LinearLayoutManager(context)
            layoutManager = linearLayoutManager
            clearOnScrollListeners()
            addOnScrollListener(InfiniteScrollListener( requestNews(), linearLayoutManager))
        

        initAdapter()

        if (savedInstanceState != null && savedInstanceState.containsKey(KEY_REDDIT_NEWS)) 
            redditNews = savedInstanceState.get(KEY_REDDIT_NEWS) as RedditNews
            (news_list.adapter as NewsAdapter).clearAndAddNews(redditNews!!.news)
         else 
            requestNews()
        
    

    override fun onSaveInstanceState(outState: Bundle) 
        super.onSaveInstanceState(outState)
        val news = (news_list.adapter as NewsAdapter).getNews()
        if (redditNews != null && news.isNotEmpty()) 
            outState.putParcelable(KEY_REDDIT_NEWS, redditNews?.copy(news = news))
        
    

    private fun requestNews() 
        /**
         * first time will send empty string for after parameter.
         * Next time we will have redditNews set with the next page to
         * navigate with the after param.
         */
        val subscription = newsManager.getNews(redditNews?.after ?: "")
            .subscribeOn(Schedulers.io())
            .subscribe(
                
                    retrievedNews ->
                    redditNews = retrievedNews
                    (news_list.adapter as NewsAdapter).addNews(retrievedNews.news)
                ,
                
                    e -> Snackbar.make(news_list, e.message ?: "WZF", Snackbar.LENGTH_LONG).show()
                
        )
        subscriptions.add(subscription)
    


    private fun initAdapter() 
        if (news_list.adapter == null) 
            news_list.adapter = NewsAdapter()
        
    

我对 RxJava 还是很陌生 - 如果有人能帮我解决这个问题,我会非常高兴。谢谢你。

【问题讨论】:

您阅读异常信息了吗? @akarnokd 是的,我想我明白问题所在。异常没有到达它的目的地。但我真的不明白如何在 RxJava 的上下文中修复它。 :// 错误信息中有一个链接指向要阅读的wiki。 在wiki部分有一个例子:搜索RxJavaPlugins.setErrorHandler 这是一个全局处理程序,在应用程序生命周期的早期定义它。 【参考方案1】:

您可以通过以下方式覆盖 Rx 错误处理程序:

    使用您的自定义 MyApplication 扩展 Application 类。

    然后在你的应用类的 onCreate 方法中写:

    @Override
    public void onCreate() 
        super.onCreate();
        RxJavaPlugins.setErrorHandler(throwable -> ); // nothing or some logging
    
    

    将 MyApplication 类添加到 Manifest:

    <application
        android:name=".MyApplication"
        ...>
    

更多信息请关注RxJava Wiki

【讨论】:

使用 Kotlin 更简单RxJavaPlugins.setErrorHandler 【参考方案2】:

为了捕获可能由 3rd pary 库或您自己的代码引发的此类错误,请使用 RxJavaPlugins 并在您使用 Rx 的乐趣或...中编写此代码,未捕获的 rx 错误将传递到此代码,所以必须检查异常类型:

RxJavaPlugins.setErrorHandler 
        if( it is  UndeliverableException)
            Toast.makeText(view?.getContext(), "UndeliverableException: " + it.message, Toast.LENGTH_LONG).show()
            return@setErrorHandler
        
    

为参考检查这个链接: enter link description here

【讨论】:

【参考方案3】:

可能是(未测试),有可能用 onNext(tag) 改变 Observable 中的 onError,标签在“normal onNext”和“error onNext”之间产生差异(替换 on Error)。

onNext(0) => 是由于异常(您可以通过公共静态字符串获取异常)

onNext(1) => 没有错误

emitter.onError(ex); => 发射器.onNext(0);

在观察者中:

@Override
public void onNext(Integer message) 
    if (message ==0 )
        handleError();
    else
        handleNext();

当然不是很“干净”,但可以解决问题……

【讨论】:

以上是关于RxJava2 UndeliverableException 在获取数据时发生方向变化的主要内容,如果未能解决你的问题,请参考以下文章

RxJava2.0学习笔记2 2018年3月29日 星期四

Android实战——RxJava2+Retrofit+RxBinding解锁各种新姿势

Android实战——RxJava2解锁图片三级缓存框架

RxJava2 发布

RXJava2 管理订阅

网络库与Rxjava2结合常见使用场景介绍