Rx Java:Observable 在订阅者请求之前发出项目
Posted
技术标签:
【中文标题】Rx Java:Observable 在订阅者请求之前发出项目【英文标题】:Rx Java: Observable emits items before subscriber requests them 【发布时间】:2017-06-11 17:36:45 【问题描述】:在下面的示例中,我希望我的订阅者最初从 observable 请求两个项目,然后每 5 秒再请求两个:
public class RxJavaExample
public static void main(String[] args)
Observable.range(1, 6)
.flatMap(testData ->
System.out.println("next item");
return Observable.just(testData);
, 1)
.toBlocking()
.subscribe(new Subscriber<Integer>()
public void onStart()
System.out.println("requesting\n\n");
request(2);
Observable.interval(5, SECONDS)
.subscribe(
x ->
System.out.println("requesting\n\n");
request(2);
);
@Override
public void onCompleted()
System.out.println("done");
@Override
public void onError(Throwable e)
System.err.println(e);
@Override
public void onNext(Integer testData)
System.out.println("OnNext");
);
在这种情况下,我希望输出是:
requesting
next item
next item
OnNext
OnNext
requesting
next item
next item
OnNext
OnNext
requesting
next item
next item
OnNext
OnNext
done
但这是我得到的实际输出:
next item
requesting
next item
next item
OnNext
OnNext
requesting
next item
OnNext
OnNext
next item
requesting
next item
OnNext
OnNext
done
在我的订阅者请求任何内容之前,Observable 似乎仍然会发出一个项目。为什么会发生这种情况?有没有办法确保 Observable 仅在我的订阅者请求时才发出项目?
我的示例相当做作,因为我可以使用 map
而不是 flatMap
,这会给我想要的行为,但对于我的实际用例,我确实需要使用 flatMap
。
【问题讨论】:
【参考方案1】:这是因为flatMap()
操作符,它根据请求的并发级别向生产者添加请求(来源range()
Observable),在您的情况下,您要求并发1,所以flatMap
将调用@ 987654325@ 在Observable
范围内,因此生产者将在订阅者请求任何内容之前生产单个项目。
注意,这里有 2 个缓冲区,虽然 flatMap()
在订阅者之前请求了项目,但它只会导致 range() 生成项目,但你不会在 onNext()
处获得未请求的项目,这意味着您的 onNext 序列符合预期,只是 range()
以不同的方式生成项目。
顺便说一句,在默认情况下flatMap()
意味着没有并发限制,它将请求无限项,在这种情况下,所有项目都将立即生成,而不仅仅是你的情况。
【讨论】:
感谢您的解释。这似乎仍然是奇怪的行为。为什么flatMap
不只是等待我的订阅者进行请求?我将并发限制设置为 1 的原因是因为在我的实际用例中,我在 flatMap
中执行异步操作,并且此操作每秒只能处理一定数量的项目。 Observable 在没有任何请求的情况下发出项目似乎很浪费。
你是对的,简而言之,这在技术上是可行的,但这种行为历史上来自 RX.NET,你可以从 David Karnok 在他的博客中更深入地了解和更好的解释:akarnokd.blogspot.co.il/2016/02/flatmap-part-1.html在背压部分下。以上是关于Rx Java:Observable 在订阅者请求之前发出项目的主要内容,如果未能解决你的问题,请参考以下文章
rx.js Observable 剖析(创建,订阅,执行,清理 )
Rx.Observable subscribe 和 forEach 有啥区别
为啥 UIView 框架上的 observable 只在订阅后触发一次
Angular RxJS入门笔记 (Observable可观察对象Subscribe订阅Observer观察者Subscription对象)