使用 OkHttp3 和 ReactiveX Java 实现长轮询的正确方法

Posted

技术标签:

【中文标题】使用 OkHttp3 和 ReactiveX Java 实现长轮询的正确方法【英文标题】:Correct way to implement long polling using OkHttp3 and ReactiveX Java 【发布时间】:2021-01-19 12:49:30 【问题描述】:

如何使用 OkHttp3 (v4.4.1) 实现长轮询,以获取响应的每一行的 RxJava (v2.2.11) Observable?可以在不阻塞线程保持读取行的情况下完成吗?如果我需要阻塞某个线程,那么我应该阻塞哪个线程?关于使用 OkHttp3 实现长轮询的任何一般示例? Google 在这个话题上对我很害羞...

TL;DR

我使用 OKHttp3 作为 HTTP 客户端,并将其包装在 makeGetObservable 方法调用中,该方法调用返回 Observable 响应,使用 newCall 回调向 Observable 发出事件。现在我正在尝试添加对长轮询服务的支持,我担心线程。

下面的代码演示了我正在尝试做的事情(并且似乎可以工作),但我很确定它是不行的。

// return Observable<Response>
makeGetObservable("http://my.service.com/api/events")
  // check for error and map to Observable<ResponseBody>
  .map(this::mapRespBodyOrError)
  // flat map to Observable<String> representing line of long polling response
  .flatMap(respBody -> Observable.create(emitter -> 
    // open reader on response body stream
    try (BufferedReader reader = new BufferedReader(respBody.charStream())) 
      String line;
      // block and wait to read a line from input
      while((line = reader.readLine()) !=null) 
        // once line was read from response body input stream emit it as observable event
        emitter.onNext(line);
      
    
  ));

【问题讨论】:

您在寻找observeOn()吗? 可能:) 我不知道更改调度程序是否是最合适的答案。如果我不必创建线程来阻止它,我会更喜欢 花了一些时间试验线程,发现被阻塞的线程是来自 okhttp3 客户端线程池的线程。默认值为 5,因此这是停止工作的限制。使用@Progman 建议的observeOn 和 IO 调度程序可以解决问题,因为它只会为每次阻塞读取生成新线程 【参考方案1】:

经过一番研究,我发现被阻塞的线程是 OkHttp 客户端线程。这是由于我实现了 makeGetObservable,它从 OkHttp 客户端的 newCall..enqueue 回调中发出。 OkHttp 客户端默认有 5 个线程池,用于一个资源的 5 个并发连接。因此,每次订阅另一个长轮询资源时,我都会阻塞其中一个线程。在 5 个阻塞线程后,OkHttp 客户端停止工作,因为它没有线程来处理响应。

正如@Progman 所建议的,我使用 IO 调度程序与subscribeOn 一起使用,该调度程序为每个阻塞 IO 操作生成新线程。使用此调度程序必须小心并正确处理资源。

我的实现目前如下所示(添加了调度程序、完成和错误事件)

// return Observable<Response>
makeGetObservable("http://my.service.com/api/events")
  // check for error and map to Observable<ResponseBody>
  .map(this::mapRespBodyOrError)
  // use IO scheduler that spawns new threads to take care of blocking operations
  .observeOn(Schedulers.io())
  // flat map to Observable<String> representing line of long polling response
  .flatMap(respBody -> Observable.create(emitter -> 
    try (BufferedReader reader = new BufferedReader(respBody.charStream())) 
      // blocking read lines while available
      String line;
      while ((line = reader.readLine()) != null) 
        // emit event for every line
        emitter.onNext(line);
      
      // emit the completion event to indicate we are done
      emitter.onComplete();
     catch (IOException | RuntimeException err) 
      // emit any error that might have occurred
      emitter.onError(err);
    
  ));

【讨论】:

以上是关于使用 OkHttp3 和 ReactiveX Java 实现长轮询的正确方法的主要内容,如果未能解决你的问题,请参考以下文章

RxJava 学习资料——ReactiveX和RxJava

原创ReactiveX之基础思路被观察者和观察者 (RxJava为例子)

Android实战——okhttp3的使用和封装

ReactiveX JS 和 TypeScript - 如何取消订阅?

io.reactivex.exceptions.UndeliverableException:java.io.InterruptedIOException

Okhttp3 使用和原理