如何合并多个 observables 并保持顺序和最大并发?
Posted
技术标签:
【中文标题】如何合并多个 observables 并保持顺序和最大并发?【英文标题】:How to merge multiple observables with order preservation and maximum concurrency? 【发布时间】:2021-02-26 15:58:19 【问题描述】:我搜索了一个副本,但没有找到。我拥有的是一个嵌套的 observable IObservable<IObservable<T>>
,我想将它展平为 IObservable<T>
。我不想使用 Concat
运算符,因为它会延迟对每个内部 observable 的订阅,直到前一个 observable 完成。这是一个问题,因为内部 observable 很冷,我希望它们在外部 observable 发出 T
值后立即开始发出。我也不想使用Merge
运算符,因为它会打乱发出值的顺序。下面的大理石图显示了Merge
运算符的有问题的(就我而言)行为,以及理想的合并行为。
Stream of observables: +----1------2-----3----|
Observable-1 : +--A-----------------B-------|
Observable-2 : +---C---------------------D------|
Observable-3 : +--E--------------------F-------|
Merge (undesirable) : +-------A-------C----E----B-----------D---F-------|
Desirable merging : +-------A-----------------B-------C---D------EF---|
Observable-1 发出的所有值都应该在 Observable-2 发出的任何值之前。 Observable-2 和 Observable-3 也应如此。
我喜欢Merge
运算符的原因是它允许配置对内部可观察对象的最大并发订阅。我想使用我正在尝试实现的自定义 MergeOrdered
运算符保留此功能。这是我正在构建的方法:
public static IObservable<T> MergeOrdered<T>(
this IObservable<IObservable<T>> source,
int maximumConcurrency = Int32.MaxValue)
return source.Merge(maximumConcurrency); // How to make it ordered?
这是一个用法示例:
var source = Observable
.Interval(TimeSpan.FromMilliseconds(300))
.Take(4)
.Select(x => Observable
.Interval(TimeSpan.FromMilliseconds(200))
.Select(y => $"x + 1-(char)(65 + y)")
.Take(3));
var results = await source.MergeOrdered(2).ToArray();
Console.WriteLine($"Results: String.Join(", ", results)");
输出(不良):
Results: 1-A, 1-B, 2-A, 1-C, 2-B, 3-A, 2-C, 3-B, 4-A, 3-C, 4-B, 4-C
理想的输出是:
Results: 1-A, 1-B, 1-C, 2-A, 2-B, 2-C, 3-A, 3-B, 3-C, 4-A, 4-B, 4-C
澄清:关于值的顺序,值本身是无关紧要的。重要的是它们起源的内部序列的顺序,以及它们在该序列中的位置。第一个内部序列中的所有值应首先发出(按其原始顺序),然后是第二个内部序列中的所有值,然后是第三个内部序列中的所有值,依此类推。
【问题讨论】:
This 是我能找到的最接近的“重复”。它有点相关,但有一些具体的细微差别,使它对我的问题既宽又窄。 有趣,你有这个可以玩,还有源代码,也许可以让你走上正轨:rxmarbles.com 您是否考虑过从源中的冷的 observable 创建热的 observable,然后只使用Concat
?
@Kamushek 是的,我也有这个想法,通过使用 Publish
运算符。但是我正在失去价值,而且我还没有设法将它与maximumConcurrency
功能结合起来。
@aybe rxmarbles.com 是否允许创建请求的MergeOrdered
运算符的这个问题中显示的复杂弹珠图? AFAICS 我只能选择一个预定义的运算符,然后用鼠标左右移动图表项目符号。
【参考方案1】:
这个 observable 无法知道任何内部 observable 的最后一个值是否是应该产生的第一个值。
例如,您可以这样:
Stream of observables: +--1---2---3--|
Observable-1 : +------------B--------A-|
Observable-2 : +--C--------D-|
Observable-3 : +-E--------F-|
Desirable merging : +------------------------ABCDEF|
在这种情况下,我会这样做:
IObservable<char> query =
sources
.ToObservable()
.Merge()
.ToArray()
.SelectMany(xs => xs.OrderBy(x => x));
【讨论】:
嗨 Enigmativity,感谢您的回答!我可能没有很好地解释这些要求。价值观本身并不重要。主要目标是保持内部序列的顺序。在您的示例中,理想的结果是+---------------B--------ACDEF|
,因为第一个序列发出 B-A、第二个 C-D 和第三个 E-F。我编辑了问题以使要求更清晰。【参考方案2】:
我通过结合使用Merge
、Merge(1)
¹ 和Replay
运算符找到了解决此问题的方法。 Merge
运算符强制执行并发策略,Merge(1)
运算符强制执行有序的顺序发射。为了防止Merge
弄乱发出值的顺序,引入了内部序列的额外包装。每个内部序列都被投影到一个IObservable<IObservable<T>>
,它立即发出内部序列,然后在内部序列完成时完成。这种包装是使用Observable.Create
方法实现的:
public static IObservable<T> MergeOrdered<T>(
this IObservable<IObservable<T>> source,
int maximumConcurrency = Int32.MaxValue)
return source.Select(inner => inner.Replay(buffered => Observable
.Create<IObservable<T>>(observer =>
observer.OnNext(buffered);
return buffered.Subscribe(_ => , observer.OnError, observer.OnCompleted);
)))
.Merge(maximumConcurrency)
.Merge(1);
Replay
操作符会缓冲内部序列发出的所有消息,这样它们在Merge
订阅和Merge(1)
订阅之间不会丢失。
有趣的是,由于包装,一个中间IObservable<IObservable<IObservable<T>>>
序列被创建。然后这个可怕的东西被解开两次,第一次是 Merge
,第二次是 Merge(1)
操作符。
这不是一个完美有效的解决方案,因为没有理由对Merge(1)
当前订阅的内部序列进行缓冲。不过,优化这种低效率并非易事,所以我将保持原样。在每个子序列包含少量元素的情况下,此缺陷的影响应该可以忽略不计。在这些情况下,尝试修复它甚至可能弊大于利。
¹ 理想情况下,我想使用Concat
而不是等效但效率较低的Merge(1)
运算符。不幸的是,当前版本的 Rx 库 (5.0.0) 中的 Concat
运算符 behaves weirdly。我什至在一个相当复杂的查询中使用Concat
时遇到了死锁行为,通过切换到Merge(1)
运算符来解决。
注意:可以在1st revision 中找到此答案的原始实现,其中包含用于控制并发性的SemaphoreSlim
而不是Merge
运算符。基于Merge
的实现应该更好,因为它不涉及即发即弃的任务延续,并且对内部序列的订阅同步发生,而不是被卸载到ThreadPool
。
【讨论】:
以上是关于如何合并多个 observables 并保持顺序和最大并发?的主要内容,如果未能解决你的问题,请参考以下文章
RxJava 合并组合两个(或多个)Observable数据源