异步流与响应式扩展相比如何?
Posted
技术标签:
【中文标题】异步流与响应式扩展相比如何?【英文标题】:How Async streams compares to reactive extension? 【发布时间】:2020-02-16 16:12:12 【问题描述】:如何比较以下两者? Rx 是否更强大?
反应式扩展:
var observable = Observable.Create<char>(async (observer, cancel) =>
while (true)
string line = await sr.ReadLineAsync();
if (line == null)
break;
observer.OnNext(line);
);
observable.Subscribe(
c => Console.WriteLine(c.ToString()),
() => end.Dispose());
异步流:
public async void Run(string path)
await foreach (var line in TestAsync())
Console.WriteLine(line);
private async IAsyncEnumerable<string> TestAsync()
while (true)
string line = await sr.ReadLineAsync();
if (line == null)
break;
yield return line;
【问题讨论】:
最显着的区别是异步枚举仍然是基于拉的,而 Rx 提供基于推送的通知。换句话说,Rx 增加了一个维度,时间。如果您的数据已经可以获取,那么 pull 就足够了。如果不是,基于声明式推送的编程通常会产生更简洁的代码。这是一个更详细的blog。 Rx 开始使用异步流。异步流在较低级别工作 我写过一个相关主题的博客:C# events as asynchronous streams with ReactiveX or Channels。 【参考方案1】:这两个功能协同工作。 PS:忘记async streams
,想想await foreach
。
异步流
异步流是允许异步迭代的(相对)低级功能。它本身不提供任何其他功能,如过滤、聚合等。它是基于拉的,而 Rx 是基于推送的。
您可以通过 .....the ReacticeX.NET Github repo 中的 System.Linq.Async 库在异步流上使用 LINQ 运算符。它很快,但不提供 Rx 的事件处理功能。
例如,没有时间的概念,更不用说使用自定义调度程序的方法了。没有订阅,没有错误事件。 GroupBy 将消耗整个源并将组项作为单独的IAsyncEnumerable
实例发出,而 Rx 的 GroupBy 将为每个组发出单独的 Observables。
在问题的示例中,IAsyncEnumerable 很自然,因为不涉及事件逻辑,只是在异步迭代器上进行迭代。
如果示例尝试轮询远程服务并检测故障峰值(即每个间隔的故障数超过阈值),IAsyncEnumerable 将不合适,因为它会阻止等待所有响应。事实上,我们根本无法聚合每个时间的事件。
线程
真的没有 - IAsyncEnumerable 或 await foreach
调用不指定事件是如何产生或消费的。如果我们想使用单独的任务来处理一个项目,我们必须自己创建它,例如:
public async Task Run(string path)
await foreach (var line in LoadStockTrades())
var result = await Task.Run(()=>AnalyzeTrade(line));
Console.WriteLine($"result : line);
反应式扩展
Reactive Extensions 是一个处理事件流的高级库。它是基于推送的,它理解时间,但它也比异步流或通道等较低级别的构造慢。
在问题的示例中,Rx 将是矫枉过正。轮询和检测尖峰很容易,有多个窗口选项。
System.Linq.Async 可以使用 ToObservable 从 IAsyncEnumerable 创建 Observable,这意味着 IAsyncEnumerable 可以用作 Rx 的源。
线程
默认情况下,Rx 是单线程的,这对于它的主要场景 - 事件流处理非常有意义。
另一方面,Rx 允许发布者、订阅者和操作者在相同或不同的线程上运行。在没有具有async/await
或DataFlow 的语言(例如Java、javascript)中,Rx 用于通过在不同线程上运行发布者和订阅者来模拟并发处理管道。
【讨论】:
我有段时间想用Rx,但是非RX也可以轻松解决这些问题。在通常的业务应用程序中,Rx 不会过度杀伤的用例似乎并不多。 @ca9163d9 Rx 在其核心场景——事件流处理中非常有用。在 Java、JavaScript 等其他语言中,它用于模拟数据流和异步处理,但 C# 已经为此提供了更好的构造。您要计算数量或请求或失败吗?您可以使用Window
,在窗口中收集事件,Count()
和Where()
仅在失败次数超过限制时检索通知。在没有 Rx 的情况下,这需要 大量 的编码。
@ca9163d9 移动欺诈检测? Azure 事件分析的一个示例是检测间隔内来自距离太远的基站的呼叫 - 按 SIM 分组、窗口批量输入、聚合以计算呼叫事件之间的距离、在哪里过滤阈值之外的呼叫
PS "await foreach" 的百万加分。我对“异步流”的主要抱怨是建立在“可枚举”的混淆之上的令人困惑的术语。 Observables 也是可枚举的,只是 .NET 中的 Enumerable 实际上应该称为“Iterable”。换句话说,一件一件地得到一些东西。在我看来,虽然异步“让我看到下一个出现的东西”的想法与“只要我的 observable 上出现某些东西就做某事”几乎是一回事。与以往一样,架构师将他们自己的概念混淆传播到语言模型中,然后传播给开发人员。
哦,Rx 绝对是很棒的东西。我想说大多数开发人员的工作最好使用 Observables 来完成,只要开发人员社区和文献中的其他人也有同样的感受。以上是关于异步流与响应式扩展相比如何?的主要内容,如果未能解决你的问题,请参考以下文章