异步流与响应式扩展相比如何?

Posted

技术标签:

【中文标题】异步流与响应式扩展相比如何?【英文标题】:How Async streams compares to reactive extension? 【发布时间】:2020-02-16 16:12:12 【问题描述】:

如何比较以下两者? Rx 是否更强大?

反应式扩展:

var observable = Observable.Create<char>(async (observer, cancel) =>

    while (true)
    
        string line = await sr.ReadLineAsync();
        if (line == null)
            break;
        observer.OnNext(line);
    
);

observable.Subscribe(
    c => Console.WriteLine(c.ToString()),
    () => end.Dispose());

异步流:

public async void Run(string path)

    await foreach (var line in TestAsync())
    
        Console.WriteLine(line);
    


private async IAsyncEnumerable<string> TestAsync()

    while (true)
    
        string line = await sr.ReadLineAsync();
        if (line == null)
            break;
        yield return line;
    

【问题讨论】:

最显着的区别是异步枚举仍然是基于拉的,而 Rx 提供基于推送的通知。换句话说,Rx 增加了一个维度,时间。如果您的数据已经可以获取,那么 pull 就足够了。如果不是,基于声明式推送的编程通常会产生更简洁的代码。这是一个更详细的blog。 Rx 开始使用异步流。异步流在较低级别工作 我写过一个相关主题的博客:C# events as asynchronous streams with ReactiveX or Channels。 【参考方案1】:

这两个功能协同工作。 PS:忘记async streams,想想await foreach

异步流

异步流是允许异步迭代的(相对)低级功能。它本身不提供任何其他功能,如过滤、聚合等。它是基于拉的,而 Rx 是基于推送的。

您可以通过 .....the ReacticeX.NET Github repo 中的 System.Linq.Async 库在异步流上使用 LINQ 运算符。它很快,但不提供 Rx 的事件处理功能。

例如,没有时间的概念,更不用说使用自定义调度程序的方法了。没有订阅,没有错误事件。 GroupBy 将消耗整个源并将组项作为单独的IAsyncEnumerable 实例发出,而 Rx 的 GroupBy 将为每个组发出单独的 Observables。

在问题的示例中,IAsyncEnumerable 很自然,因为不涉及事件逻辑,只是在异步迭代器上进行迭代。

如果示例尝试轮询远程服务并检测故障峰值(即每个间隔的故障数超过阈值),IAsyncEnumerable 将不合适,因为它会阻止等待所有响应。事实上,我们根本无法聚合每个时间的事件。

线程

真的没有 - IAsyncEnumerable 或 await foreach 调用不指定事件是如何产生或消费的。如果我们想使用单独的任务来处理一个项目,我们必须自己创建它,例如:

public async Task Run(string path)

    await foreach (var line in LoadStockTrades())
    
        var result = await Task.Run(()=>AnalyzeTrade(line));
        Console.WriteLine($"result : line);
    

反应式扩展

Reactive Extensions 是一个处理事件流的高级库。它是基于推送的,它理解时间,但它也比异步流或通道等较低级别的构造慢。

在问题的示例中,Rx 将是矫枉过正。轮询和检测尖峰很容易,有多个窗口选项。

System.Linq.Async 可以使用 ToObservable 从 IAsyncEnumerable 创建 Observable,这意味着 IAsyncEnumerable 可以用作 Rx 的源。

线程

默认情况下,Rx 是单线程的,这对于它的主要场景 - 事件流处理非常有意义。

另一方面,Rx 允许发布者、订阅者和操作者在相同或不同的线程上运行。在没有具有async/await 或DataFlow 的语言(例如Java、javascript)中,Rx 用于通过在不同线程上运行发布者和订阅者来模拟并发处理管道。

【讨论】:

我有段时间想用Rx,但是非RX也可以轻松解决这些问题。在通常的业务应用程序中,Rx 不会过度杀伤的用例似乎并不多。 @ca9163d9 Rx 在其核心场景——事件流处理中非常有用。在 Java、JavaScript 等其他语言中,它用于模拟数据流和异步处理,但 C# 已经为此提供了更好的构造。您要计算数量或请求或失败吗?您可以使用Window,在窗口中收集事件,Count()Where() 仅在失败次数超过限制时检索通知。在没有 Rx 的情况下,这需要 大量 的编码。 @ca9163d9 移动欺诈检测? Azure 事件分析的一个示例是检测间隔内来自距离太远的基站的呼叫 - 按 SIM 分组、窗口批量输入、聚合以计算呼叫事件之间的距离、在哪里过滤阈值之外的呼叫 PS "await foreach" 的百万加分。我对“异步流”的主要抱怨是建立在“可枚举”的混淆之上的令人困惑的术语。 Observables 也是可枚举的,只是 .NET 中的 Enumerable 实际上应该称为“Iterable”。换句话说,一件一件地得到一些东西。在我看来,虽然异步“让我看到下一个出现的东西”的想法与“只要我的 observable 上出现某些东西就做某事”几乎是一回事。与以往一样,架构师将他们自己的概念混淆传播到语言模型中,然后传播给开发人员。 哦,Rx 绝对是很棒的东西。我想说大多数开发人员的工作最好使用 Observables 来完成,只要开发人员社区和文献中的其他人也有同样的感受。

以上是关于异步流与响应式扩展相比如何?的主要内容,如果未能解决你的问题,请参考以下文章

浅析Java响应式编程(Reactive Programming)

如何动态启用/禁用响应式扩展

当响应式编程遇上springboot

初探响应式编程框架Combine构建的分工协作体系

简单对比vue2.x与vue3.x响应式及新功能

当RX遇上Spring——Spring 5新特性之响应式Web应用