Reactive Framework、PLINQ、TPL 和并行扩展如何相互关联?

Posted

技术标签:

【中文标题】Reactive Framework、PLINQ、TPL 和并行扩展如何相互关联?【英文标题】:How do Reactive Framework, PLINQ, TPL and Parallel Extensions relate to each other? 【发布时间】:2011-01-09 10:47:29 【问题描述】:

至少自 .NET 4.0 发布以来,微软似乎在支持并行和异步编程方面付出了很多努力,并且似乎出现了很多围绕此的 API 和库。尤其是下面这些花哨的名字最近到处都在被提及:

反应式框架, PLINQ(并行 LINQ), TPL(任务并行库)和 并行扩展。

现在它们似乎都是 Microsoft 产品,而且它们似乎都针对 .NET 的异步或并行编程场景。但目前尚不清楚它们中的每一个究竟是什么以及它们之间的关系。有些可能实际上是一样的。

简而言之,谁能直接说明什么是什么?

【问题讨论】:

我已经编辑了这个问题,试图澄清一下,并投票决定重新打开它。也就是说,我不是 .NET 专家,所以我可能把它弄得一团糟;如果您认为我在某个地方犯了错误,请随时对其进行进一步编辑(甚至完全还原我的更改)。另外,坦率地说,我仍然认为这个问题感觉太宽泛且不清楚,不适合 Stack Overflow;也就是说,鉴于它确实有一个高度赞成和接受的答案,我愿意接受它可能是一个例外。 我认为每个人都知道上述 4 件事(即每个可能回答这个问题的人)都会马上知道这个问题是关于什么的。赞成票和答案证明了这一点。无论如何,我已经编辑了问题,使其尽可能清晰。 【参考方案1】:

PLINQ(Parallel Linq)只是一种编写常规 Linq 查询的新方法,以便它们并行运行 - 换句话说,框架将自动处理跨多个线程运行您的查询,因此它们完成得更快(即使用多个 CPU 内核)。

例如,假设您有一堆字符串,并且您想要获取所有以字母“A”开头的字符串。你可以这样写你的查询:

var words = new[]  "Apple", "Banana", "Coconut", "Anvil" ;
var myWords = words.Select(s => s.StartsWith("A"));

这很好用。但是,如果您要搜索 50,000 个单词,您可能希望利用每个测试都是独立的这一事实,并将其拆分到多个核心:

var myWords = words.AsParallel().Select(s => s.StartsWith("A"));

这就是将常规查询转换为在多个内核上运行的并行查询所需要做的一切。很整洁。


TPL(任务并行库)是对 PLINQ 的一种补充,它们共同构成了并行扩展。 PLINQ 主要基于 函数式 编程风格,没有副作用,而副作用正是 TPL 的用途。如果您想真正做工作而不是仅仅并行搜索/选择事物,您可以使用 TPL。

TPL 本质上是Parallel 类,它公开了ForForeachInvoke 的重载。 Invoke 有点像在ThreadPool 中排队任务,但使用起来更简单一些。 IMO,更有趣的位是ForForeach。例如,假设您有一大堆要压缩的文件。您可以编写常规的顺序版本:

string[] fileNames = (...);
foreach (string fileName in fileNames)

    byte[] data = File.ReadAllBytes(fileName);
    byte[] compressedData = Compress(data);
    string outputFileName = Path.ChangeExtension(fileName, ".zip");
    File.WriteAllBytes(outputFileName, compressedData);

同样,此压缩的每次迭代都完全独立于其他任何迭代。我们可以通过同时执行几个来加快速度:

Parallel.ForEach(fileNames, fileName =>

    byte[] data = File.ReadAllBytes(fileName);
    byte[] compressedData = Compress(data);
    string outputFileName = Path.ChangeExtension(fileName, ".zip");
    File.WriteAllBytes(outputFileName, compressedData);
);

再一次,这就是并行化此操作所需的全部内容。现在,当我们运行 CompressFiles 方法(或我们决定调用它的任何名称)时,它将使用多个 CPU 内核,并且可能会在一半或 1/4 的时间内完成。

与将其全部放入ThreadPool 相比,它的优势在于它实际上同步运行。如果您改用ThreadPool(或只是简单的Thread 实例),您必须想出一种方法来找出所有任务何时完成,虽然这不是非常 很复杂,这是很多人倾向于搞砸或至少遇到麻烦的事情。当您使用Parallel 类时,您实际上不必考虑它;多线程方面对您隐藏,一切都在幕后处理。


Reactive Extensions (Rx) 真的是完全不同的野兽。这是考虑事件处理的不同方式。确实有很多材料要介绍,但长话短说,Rx 不是将事件处理程序连接到事件,而是让您将事件序列视为......以及序列 (IEnumerable<T>)。您可以以迭代方式处理事件,而不是让它们在随机时间异步触发,在这种情况下,您必须一直保存状态才能检测以特定顺序发生的一系列事件。

我发现的关于 Rx 的最酷的例子之一是 here。跳到“Linq to IObservable”部分,他仅用 4 行代码实现了拖放处理程序,这在 WPF 中通常是一个难题。 Rx 为您提供了事件的组合,这是常规事件处理程序所没有的,并且像这样的代码 sn-ps 也可以直接重构为您可以在任何地方使用的行为类。


就是这样。这些是 .NET 4.0 中可用的一些更酷的功能。当然还有更多,但这些是你问的!

【讨论】:

这些在 .net 3.5 或更低版本中可用吗? @Myster:RX 可用 (msdn.microsoft.com/en-us/devlabs/ee794896.aspx)。 PLINQ 很久以前作为 CTP 提供,但不再提供 - 您必须为此进行升级。 与 C# 异步 CTP 一起出现的 TPL 数据流库 (TDF) 看起来非常好。它应该使 .NET 中间件更容易保持在并行处理的“性能最佳点”内。为了获得最佳性能(考虑吞吐量和延迟),您设置了限制。因此,每个处理“块”都可以有自己的“缓冲区上限”,并且可以有并行度上限。您还可以告诉每个块处理批处理而不是单个消息。这看起来很棒。【参考方案2】:

我喜欢 Aaronaught 的回答,但我会说 Rx 和 TPL 解决不同的问题。 TPL 团队添加的部分内容是线程原语和对运行时构建块(如 ThreadPool)的显着增强。您列出的所有内容都建立在这些原语和运行时功能之上。

但是 TPL 和 Rx 解决了两个不同的问题。当程序或算法是“拉和排队”时,TPL 效果最好。当程序或算法需要对来自流的数据(如鼠标输入或从 WCF 等端点接收相关消息流)进行“反应”时,Rx 表现出色。

您需要 TPL 的“工作单元”概念来像文件系统一样工作、迭代集合或像组织结构图一样遍历层次结构。在每种情况下,程序员都可以推断出工作的总量,可以将工作分解为一定大小的块(任务),并且在通过层次结构进行计算的情况下,可以将任务“链接”在一起.因此,某些类型的工作适用于 TPL 的“任务层次结构”模型,并受益于取消等管道的增强(参见 CancellationTokenSource 上的第 9 频道视频)。 TPL 也有很多专门领域的旋钮,比如近实时数据处理。

Rx 将是大多数开发人员最终应该使用的。这是 WPF 应用程序如何对外部消息如外部数据(到 IM 客户端的 IM 消息流)或外部输入(如从 Aaronaught 链接的鼠标拖动示例)之类的外部消息“做出反应”的方式。在幕后,Rx 使用来自 TPL/BCL 的线程原语、来自 TPL/BCL 的线程安全集合,以及像 ThreadPool 这样的运行时对象。在我看来,Rx 是表达意图的“***别”编程。

普通开发人员是否能够理解您可以使用 Rx 表达的一系列意图还有待观察。 :)

但我认为在接下来的几年中,TPL 与 Rx 将成为下一场辩论,例如 LINQ-to-SQL 与实体框架。在同一个领域中有两种 API 风格,专门用于不同的场景,但在很多方面重叠。但在 TPL 和 Rx 的情况下,它们实际上是相互了解的,并且有内置的适配器来组合应用程序并一起使用这两个框架(例如将 PLINQ 循环的结果馈送到 IObservable Rx 流中)。对于没有做过任何并行编程的人来说,需要大量学习才能跟上进度。

更新:过去 6 个月(自我最初回答以来的 18 个月),我在常规工作中一直使用 TPL 和 RxNet。我在中间层 WCF 服务(企业 LOB 服务)中选择 TPL 和/或 RxNet 的想法:http://yzorgsoft.blogspot.com/2011/09/middle-tier-tpl-andor-rxnet.html

【讨论】:

要注意的另一件事是,TPL 和 TPL 数据流从假设您想要并行处理开始。默认情况下,Rx 以“丰富的事件”开始,因此它会让您跳过箍来进行并行处理(例如增加并行处理的并发项目的数量)。请参阅 "think of IObservable as thread safe" 和 "how to process Queue asyncchronously using Rx"。在我看来,accidentally use blocking operations in Rx 也太容易了

以上是关于Reactive Framework、PLINQ、TPL 和并行扩展如何相互关联?的主要内容,如果未能解决你的问题,请参考以下文章

并行LINQ PLinq

在 PLINQ 中绑定源线程

PLINQ 更新失败

如何使用PLINQ

27.7 并行语言集成查询(PLinq)

何时使用 Parallel.ForEach,何时使用 PLINQ