Retrofit/OkHTTP/RxJava 间歇性 InterruptedIOException

Posted

技术标签:

【中文标题】Retrofit/OkHTTP/RxJava 间歇性 InterruptedIOException【英文标题】:Retrofit/OkHTTP/RxJava intermittent InterruptedIOException 【发布时间】:2018-06-12 14:37:22 【问题描述】:

我正在使用以下(过时的)库:

Retrofit: 1.9.0
OkHTTP: 2.3.0
Rxandroid: 0.24.0

我注意到每隔一段时间我都会收到以下关于我的 POST 请求的堆栈跟踪:

 D/Retrofit: ---> HTTP POST https:xxxxx
 D/Retrofit: Content-Type: application/x-www-form-urlencoded; charset=UTF-8
 D/Retrofit: Content-Length: 396
 D/Retrofit: ---> END HTTP (396-byte body)
 D/Retrofit: ---- ERROR https:xxxxx
 I/Choreographer: Skipped 33 frames!  The application may be doing too much work on its main thread.
 D/Retrofit: java.io.InterruptedIOException
   at okio.Timeout.throwIfReached(Timeout.java:146)
   at okio.Okio$1.write(Okio.java:75)
   at okio.AsyncTimeout$1.write(AsyncTimeout.java:155)
   at okio.RealBufferedSink.flush(RealBufferedSink.java:201)
   at com.squareup.okhttp.internal.http.HttpConnection.flush(HttpConnection.java:140)
   at com.squareup.okhttp.Connection.makeTunnel(Connection.java:399)
   at com.squareup.okhttp.Connection.upgradeToTls(Connection.java:229)
   at com.squareup.okhttp.Connection.connect(Connection.java:159)
   at com.squareup.okhttp.Connection.connectAndSetOwner(Connection.java:175)
   at com.squareup.okhttp.OkHttpClient$1.connectAndSetOwner(OkHttpClient.java:120)
   at com.squareup.okhttp.internal.http.HttpEngine.nextConnection(HttpEngine.java:330)
   at com.squareup.okhttp.internal.http.HttpEngine.connect(HttpEngine.java:319)
   at com.squareup.okhttp.internal.http.HttpEngine.sendRequest(HttpEngine.java:241)
   at com.squareup.okhttp.Call.getResponse(Call.java:271)
   at com.squareup.okhttp.Call$ApplicationInterceptorChain.proceed(Call.java:228)
   at com.squareup.okhttp.Call.getResponseWithInterceptorChain(Call.java:199)
   at com.squareup.okhttp.Call.execute(Call.java:79)
   at retrofit.client.OkClient.execute(OkClient.java:53)
   at retrofit.RestAdapter$RestHandler.invokeRequest(RestAdapter.java:326)
   at retrofit.RestAdapter$RestHandler.invoke(RestAdapter.java:240)
   at java.lang.reflect.Proxy.invoke(Proxy.java:913)
   at $Proxy1.replyTransaction(Unknown Source)
   < App Specific Trace > 
   at rx.Observable$1.call(Observable.java:145)
   at rx.Observable$1.call(Observable.java:137)
   at rx.Observable.unsafeSubscribe(Observable.java:7304)
   at rx.internal.operators.OperatorSubscribeOn$1$1.call(OperatorSubscribeOn.java:62)
   at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:47)
   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:457)
   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
   at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:301)
   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1162)
   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:636)
   at java.lang.Thread.run(Thread.java:764)
 D/Retrofit: ---- END ERROR
 retrofit.RetrofitError
     at retrofit.RestAdapter$RestHandler.invokeRequest(RestAdapter.java:395)
     at retrofit.RestAdapter$RestHandler.invoke(RestAdapter.java:240)
     at java.lang.reflect.Proxy.invoke(Proxy.java:913)
     at $Proxy1.replyTransaction(Unknown Source)
     < App Specific Trace > 
     at rx.Observable$1.call(Observable.java:145)
     at rx.Observable$1.call(Observable.java:137)
     at rx.Observable.unsafeSubscribe(Observable.java:7304)
     at rx.internal.operators.OperatorSubscribeOn$1$1.call(OperatorSubscribeOn.java:62)
     at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:47)
     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:457)
     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:301)
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1162)
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:636)
     at java.lang.Thread.run(Thread.java:764)
  Caused by: java.io.InterruptedIOException
     at okio.Timeout.throwIfReached(Timeout.java:146)
     at okio.Okio$1.write(Okio.java:75)
     at okio.AsyncTimeout$1.write(AsyncTimeout.java:155)
     at okio.RealBufferedSink.flush(RealBufferedSink.java:201)
     at com.squareup.okhttp.internal.http.HttpConnection.flush(HttpConnection.java:140)
     at com.squareup.okhttp.Connection.makeTunnel(Connection.java:399)
     at com.squareup.okhttp.Connection.upgradeToTls(Connection.java:229)
     at com.squareup.okhttp.Connection.connect(Connection.java:159)
     at com.squareup.okhttp.Connection.connectAndSetOwner(Connection.java:175)
     at com.squareup.okhttp.OkHttpClient$1.connectAndSetOwner(OkHttpClient.java:120)
     at com.squareup.okhttp.internal.http.HttpEngine.nextConnection(HttpEngine.java:330)
     at com.squareup.okhttp.internal.http.HttpEngine.connect(HttpEngine.java:319)
     at com.squareup.okhttp.internal.http.HttpEngine.sendRequest(HttpEngine.java:241)
     at com.squareup.okhttp.Call.getResponse(Call.java:271)
     at com.squareup.okhttp.Call$ApplicationInterceptorChain.proceed(Call.java:228)
     at com.squareup.okhttp.Call.getResponseWithInterceptorChain(Call.java:199)
     at com.squareup.okhttp.Call.execute(Call.java:79)
     at retrofit.client.OkClient.execute(OkClient.java:53)
     at retrofit.RestAdapter$RestHandler.invokeRequest(RestAdapter.java:326)
     at retrofit.RestAdapter$RestHandler.invoke(RestAdapter.java:240) 
     at java.lang.reflect.Proxy.invoke(Proxy.java:913) 
     at $Proxy1.replyTransaction(Unknown Source) 
     < App Specific Trace > 
     at rx.Observable$1.call(Observable.java:145) 
     at rx.Observable$1.call(Observable.java:137) 
     at rx.Observable.unsafeSubscribe(Observable.java:7304) 
     at rx.internal.operators.OperatorSubscribeOn$1$1.call(OperatorSubscribeOn.java:62) 
     at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:47) 
     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:457) 
     at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:301) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1162) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:636) 
     at java.lang.Thread.run(Thread.java:764)

我的代码如下:

网络调用:

    return Observable.create(new Observable.OnSubscribe<Object>() 
      @Override public void call(Subscriber<? super Object> subscriber) 
        try 
          subscriber.onNext(/* Sync network call here. */);
         catch (Exception e) 
          subscriber.onError(e);
        
        subscriber.onCompleted();
      
    )
        .onBackpressureBuffer()
        // singleton for Schedulers.io()
        .subscribeOn(ioscheduler)
        // singleton for AndroidSchedulers.mainThread()
        .observeOn(mainThreadScheduler);

这样称呼:

subscription = makeNetworkCall()
            .subscribe(new Observer<Object>() 
              @Override public void onNext(Object object) 
                // close and finish activity
              
              @Override public void onError(Throwable e) 
                // whoops!
              
            );

并且在 Activity 中被取消订阅,如下所示:

  @Override
  protected void onPause() 
    super.onPause();
    if (subscription != null) 
      subscription.unsubscribe();
      subscription = null;
      if (condition) 
        finish();
      
    
  

我对此的理解是我得到了一个InterruptedException,因为我是来自Subscription 中的unsubscribe()ing onPause()。我们会为其他请求执行此操作,所以我不确定为什么我只在这里看到它以及为什么会发生这种情况。

为了提供更多关于这方面的信息,网络调用发生在一个从锁定屏幕启动的 Activity 中,通过通知操作强制用户解锁他们的设备以采取行动。

我的问题是为什么会发生这种情况,是否有补救的好方法?根据一些文章,onPause() 是您应该取消订阅Observables 的地方。

更多信息请见In okHTTP's github issues

谢谢!

【问题讨论】:

当心你破坏了Observable Contract。如果您调用onError,您不得调用onCompleted 【参考方案1】:

这是一个猜测,但可能是您在发出中断的异常之前没有检查订阅者是否仍然存在。

试试这个:

return Observable.create(new Observable.OnSubscribe<Object>() 
      @Override public void call(Subscriber<? super Object> subscriber) 
        try 
          subscriber.onNext(/* Sync network call here. */);
          subscriber.onCompleted();
         catch (Exception e) 
          if (!subscriber.isUnsubscribed())  // <-- check before emitting error
            subscriber.onError(e);
          
        
      
    )

【讨论】:

【参考方案2】:

取消订阅后,链不再需要做任何工作,因为我们不再关心输出。我们可以让当前的操作完成,并在到达下一个操作员时干净地停止工作,但是为什么要浪费我们不需要的资源呢?如果有正在进行的网络请求,我们想取消它。我们通过中断线程来做到这一点。如果/当此线程上的代码检查它是否被中断时,则不需要做更多的工作,并且可以抛出InterruptedIOException

取决于调用取消订阅的确切时间,您可能会或可能不会看到错误。如果在请求开始加载之前或完成之后调用取消订阅,那么您可能不会看到此错误。

所以这个异常的原因是 OkHttp 一旦知道你对它的结果不感兴趣就想放弃,而 RxJava 不想吞下它,以防它对你很重要。

您可以添加一个默认错误处理程序来抑制无法交付的InterruptedExceptions 和InterruptedIOExceptions,它们很少有用。见RxJava 2 Error Handling,我相信RxJava 1 也有类似的结构,但你应该考虑迁移到RxJava 2。其实差别不大,RxBinding 已经更新到RxJava 2。

onPause() 不一定是您应该取消订阅的位置。您应该在您的应用程序逻辑需要的地方以及可以防止内存泄漏的地方取消订阅。

【讨论】:

【参考方案3】:

试试这个:

return Observable.create(e -> 
           try 
               e.onNext(...);
            catch (Exception ex) 
               e.tryOnError(ex);
           
           e.onComplete();
       );

tryOnError() 在发送错误时检查 Observer 是否被释放。

参考:https://github.com/ReactiveX/RxJava/issues/4863

【讨论】:

以上是关于Retrofit/OkHTTP/RxJava 间歇性 InterruptedIOException的主要内容,如果未能解决你的问题,请参考以下文章

Mvp Retrofit Okhttp Rxjava

Android入门第65天-mvvm模式下的retrofit2+okhttp3+rxjava

基于Retrofit+RxJava 封装 Leopard 网络框架

Android 网络框架 Retrofit2.0介绍使用和封装

无间歇文字滚动_ 原生js实现新闻无间歇性上下滚动

jQuery BreakingNews 间歇滚动