Rx Java:Observable 在订阅者请求之前发出项目

Posted

技术标签:

【中文标题】Rx Java:Observable 在订阅者请求之前发出项目【英文标题】:Rx Java: Observable emits items before subscriber requests them 【发布时间】:2017-06-11 17:36:45 【问题描述】:

在下面的示例中,我希望我的订阅者最初从 observable 请求两个项目,然后每 5 秒再请求两个:

public class RxJavaExample 

public static void main(String[] args) 
  Observable.range(1, 6)
      .flatMap(testData -> 
        System.out.println("next item");
        return Observable.just(testData);
      , 1)
    .toBlocking()
    .subscribe(new Subscriber<Integer>() 

      public void onStart() 
        System.out.println("requesting\n\n");
        request(2);
        Observable.interval(5, SECONDS)
            .subscribe(
                x -> 
                  System.out.println("requesting\n\n");
                  request(2);
                ); 
      

      @Override
      public void onCompleted() 
        System.out.println("done");
      

      @Override
      public void onError(Throwable e) 
        System.err.println(e);
      

      @Override
      public void onNext(Integer testData) 
        System.out.println("OnNext");
      
    );
 

在这种情况下,我希望输出是:

requesting


next item
next item
OnNext
OnNext
requesting


next item
next item
OnNext
OnNext
requesting


next item
next item
OnNext
OnNext
done

但这是我得到的实际输出:

  next item
  requesting


  next item
  next item
  OnNext
  OnNext
  requesting


  next item
  OnNext
  OnNext
  next item
  requesting


  next item
  OnNext
  OnNext
  done

在我的订阅者请求任何内容之前,Observable 似乎仍然会发出一个项目。为什么会发生这种情况?有没有办法确保 Observable 仅在我的订阅者请求时才发出项目?

我的示例相当做作,因为我可以使用 map 而不是 flatMap,这会给我想要的行为,但对于我的实际用例,我确实需要使用 flatMap

【问题讨论】:

【参考方案1】:

这是因为flatMap() 操作符,它根据请求的并发级别向生产者添加请求(来源range() Observable),在您的情况下,您要求并发1,所以flatMap 将调用@ 987654325@ 在Observable 范围内,因此生产者将在订阅者请求任何内容之前生产单个项目。

注意,这里有 2 个缓冲区,虽然 flatMap() 在订阅者之前请求了项目,但它只会导致 range() 生成项目,但你不会在 onNext() 处获得未请求的项目,这意味着您的 onNext 序列符合预期,只是 range() 以不同的方式生成项目。

顺便说一句,在默认情况下flatMap() 意味着没有并发限制,它将请求无限项,在这种情况下,所有项目都将立即生成,而不仅仅是你的情况。

【讨论】:

感谢您的解释。这似乎仍然是奇怪的行为。为什么flatMap 不只是等待我的订阅者进行请求?我将并发限制设置为 1 的原因是因为在我的实际用例中,我在 flatMap 中执行异步操作,并且此操作每秒只能处理一定数量的项目。 Observable 在没有任何请求的情况下发出项目似乎很浪费。 你是对的,简而言之,这在技术上是可行的,但这种行为历史上来自 RX.NET,你可以从 David Karnok 在他的博客中更深入地了解和更好的解释:akarnokd.blogspot.co.il/2016/02/flatmap-part-1.html在背压部分下。

以上是关于Rx Java:Observable 在订阅者请求之前发出项目的主要内容,如果未能解决你的问题,请参考以下文章

rx.js Observable 剖析(创建,订阅,执行,清理 )

Rx.Observable subscribe 和 forEach 有啥区别

Rxjs 如何知道 observable 有多少订阅者?

为啥 UIView 框架上的 observable 只在订阅后触发一次

订阅者缺少消息;这是 Rx 的错误还是我做错了?

Angular RxJS入门笔记 (Observable可观察对象Subscribe订阅Observer观察者Subscription对象)