如何合并多个 observables 并保持顺序和最大并发?

Posted

技术标签:

【中文标题】如何合并多个 observables 并保持顺序和最大并发?【英文标题】:How to merge multiple observables with order preservation and maximum concurrency? 【发布时间】:2021-02-26 15:58:19 【问题描述】:

我搜索了一个副本,但没有找到。我拥有的是一个嵌套的 observable IObservable<IObservable<T>>,我想将它展平为 IObservable<T>。我不想使用 Concat 运算符,因为它会延迟对每个内部 observable 的订阅,直到前一个 observable 完成。这是一个问题,因为内部 observable 很冷,我希望它们在外部 observable 发出 T 值后立即开始发出。我也不想使用Merge 运算符,因为它会打乱发出值的顺序。下面的大理石图显示了Merge 运算符的有问题的(就我而言)行为,以及理想的合并行为。

Stream of observables: +----1------2-----3----|
Observable-1         :      +--A-----------------B-------|
Observable-2         :             +---C---------------------D------|
Observable-3         :                   +--E--------------------F-------|
Merge (undesirable)  : +-------A-------C----E----B-----------D---F-------|
Desirable merging    : +-------A-----------------B-------C---D------EF---|

Observable-1 发出的所有值都应该在 Observable-2 发出的任何值之前。 Observable-2 和 Observable-3 也应如此。

我喜欢Merge 运算符的原因是它允许配置对内部可观察对象的最大并发订阅。我想使用我正在尝试实现的自定义 MergeOrdered 运算符保留此功能。这是我正在构建的方法:

public static IObservable<T> MergeOrdered<T>(
    this IObservable<IObservable<T>> source,
    int maximumConcurrency = Int32.MaxValue)

    return source.Merge(maximumConcurrency); // How to make it ordered?

这是一个用法示例:

var source = Observable
    .Interval(TimeSpan.FromMilliseconds(300))
    .Take(4)
    .Select(x => Observable
        .Interval(TimeSpan.FromMilliseconds(200))
        .Select(y => $"x + 1-(char)(65 + y)")
        .Take(3));

var results = await source.MergeOrdered(2).ToArray();
Console.WriteLine($"Results: String.Join(", ", results)");

输出(不良):

Results: 1-A, 1-B, 2-A, 1-C, 2-B, 3-A, 2-C, 3-B, 4-A, 3-C, 4-B, 4-C

理想的输出是:

Results: 1-A, 1-B, 1-C, 2-A, 2-B, 2-C, 3-A, 3-B, 3-C, 4-A, 4-B, 4-C

澄清:关于值的顺序,值本身是无关紧要的。重要的是它们起源的内部序列的顺序,以及它们在该序列中的位置。第一个内部序列中的所有值应首先发出(按其原始顺序),然​​后是第二个内部序列中的所有值,然后是第三个内部序列中的所有值,依此类推。

【问题讨论】:

This 是我能找到的最接近的“重复”。它有点相关,但有一些具体的细微差别,使它对我的问题既宽又窄。 有趣,你有这个可以玩,还有源代码,也许可以让你走上正轨:rxmarbles.com 您是否考虑过从源中的冷的 observable 创建热的 observable,然后只使用 Concat @Kamushek 是的,我也有这个想法,通过使用 Publish 运算符。但是我正在失去价值,而且我还没有设法将它与maximumConcurrency 功能结合起来。 @aybe rxmarbles.com 是否允许创建请求的MergeOrdered 运算符的这个问题中显示的复杂弹珠图? AFAICS 我只能选择一个预定义的运算符,然后用鼠标左右移动图表项目符号。 【参考方案1】:

这个 observable 无法知道任何内部 observable 的最后一个值是否是应该产生的第一个值。

例如,您可以这样:

Stream of observables: +--1---2---3--|
Observable-1         :    +------------B--------A-|
Observable-2         :        +--C--------D-|
Observable-3         :            +-E--------F-|
Desirable merging    : +------------------------ABCDEF|

在这种情况下,我会这样做:

IObservable<char> query =
    sources
        .ToObservable()
        .Merge()
        .ToArray()
        .SelectMany(xs => xs.OrderBy(x => x));

【讨论】:

嗨 Enigmativity,感谢您的回答!我可能没有很好地解释这些要求。价值观本身并不重要。主要目标是保持内部序列的顺序。在您的示例中,理想的结果是+---------------B--------ACDEF|,因为第一个序列发出 B-A、第二个 C-D 和第三个 E-F。我编辑了问题以使要求更清晰。【参考方案2】:

我通过结合使用MergeMerge(1)¹ 和Replay 运算符找到了解决此问题的方法。 Merge 运算符强制执行并发策略,Merge(1) 运算符强制执行有序的顺序发射。为了防止Merge 弄乱发出值的顺序,引入了内部序列的额外包装。每个内部序列都被投影到一个IObservable&lt;IObservable&lt;T&gt;&gt;,它立即发出内部序列,然后在内部序列完成时完成。这种包装是使用Observable.Create 方法实现的:

public static IObservable<T> MergeOrdered<T>(
    this IObservable<IObservable<T>> source,
    int maximumConcurrency = Int32.MaxValue)

    return source.Select(inner => inner.Replay(buffered => Observable
        .Create<IObservable<T>>(observer =>
    
        observer.OnNext(buffered);
        return buffered.Subscribe(_ =>  , observer.OnError, observer.OnCompleted);
    )))
    .Merge(maximumConcurrency)
    .Merge(1);

Replay 操作符会缓冲内部序列发出的所有消息,这样它们在Merge 订阅和Merge(1) 订阅之间不会丢失。

有趣的是,由于包装,一个中间IObservable&lt;IObservable&lt;IObservable&lt;T&gt;&gt;&gt; 序列被创建。然后这个可怕的东西被解开两次,第一次是 Merge,第二次是 Merge(1) 操作符。

这不是一个完美有效的解决方案,因为没有理由对Merge(1) 当前订阅的内部序列进行缓冲。不过,优化这种低效率并非易事,所以我将保持原样。在每个子序列包含少量元素的情况下,此缺陷的影响应该可以忽略不计。在这些情况下,尝试修复它甚至可能弊大于利。

¹ 理想情况下,我想使用Concat 而不是等效但效率较低的Merge(1) 运算符。不幸的是,当前版本的 Rx 库 (5.0.0) 中的 Concat 运算符 behaves weirdly。我什至在一个相当复杂的查询中使用Concat 时遇到了死锁行为,通过切换到Merge(1) 运算符来解决。


注意:可以在1st revision 中找到此答案的原始实现,其中包含用于控制并发性的SemaphoreSlim 而不是Merge 运算符。基于Merge 的实现应该更好,因为它不涉及即发即弃的任务延续,并且对内部序列的订阅同步发生,而不是被卸载到ThreadPool

【讨论】:

以上是关于如何合并多个 observables 并保持顺序和最大并发?的主要内容,如果未能解决你的问题,请参考以下文章

RxJS forkJoin 会按顺序返回结果吗?

结合多个条件 Observable 来准备数据

RxJava 合并组合两个(或多个)Observable数据源

如何在c#中保持内部顺序的同时将2个排序的列表合并到一个随机列表中

合并 Observable 列表并等待所有完成

合并多个文件,同时保持其身份