TPL 数据流与普通信号量
Posted
技术标签:
【中文标题】TPL 数据流与普通信号量【英文标题】:TPL Dataflow vs plain Semaphore 【发布时间】:2019-01-08 00:12:55 【问题描述】:我需要制作可扩展的流程。该进程主要有 I/O 操作和一些次要的 CPU 操作(主要是反序列化字符串)。该过程在数据库中查询 url 列表,然后从这些 url 中获取数据,将下载的数据反序列化为对象,然后将一些数据持久化到 crm 动态中以及另一个数据库中。之后我需要更新第一个处理了 url 的数据库。部分要求是使并行度可配置。
最初我想通过等待的任务序列来实现它,并使用信号量限制并行性 - 非常简单。然后我在这里阅读了@Stephen Cleary 的一些帖子和答案,它推荐使用 TPL Dataflow,我认为它可能是一个不错的选择。但是,我想确保通过使用 Dataflow 使代码“复杂化”,这是一个有价值的原因。我还建议使用ForEachAsync extension method,它也很容易使用,但是我不确定它是否会因为它对集合进行分区的方式而导致内存开销。
对于这种情况,TPL 数据流是一个不错的选择吗?它比 Semaphore 或 ForEachAsync 方法有什么好处 - 如果我通过 TPL DataFlow 实现它而不是其他每个选项(Semaphore/ForEachASync),我将实际获得什么好处?
【问题讨论】:
Tpl Dataflow 更适合 cpu 工作。对于异步 I/O 调用,我会将 Task.WhenAll 用于一系列任务 @PeterBons - 我的场景主要有 I/O 调用,但也有一些 cpu 工作(例如反序列化文件的内容),我可以用信号量来实现它,但我的印象是使用 Tpl Dataflow 会提高性能,但我仍然不确定我是否完全理解 Dataflow 的好处,因此我可以确定它们是否值得,因为它可能会使我的代码比仅使用信号量更复杂? 我真的很想得到一些关于这个的专家意见。我正在做与您完全相同的事情,无法在信号量和 TPL 数据流之间做出决定。我倾向于使用ActionBlock
和 MaxDegreeOfParallelism
作为可配置的。据我了解,TPL 以高效的方式为您管理线程池,但有some other issues。我想保持简单,只是限制一次运行的任务数量,你也是这样做的吗?
哦,顺便说一句,看看@Stephen Cleary 的answer。 TPL Dataflow is great, especially if you're looking to limit work in one part of a larger pipeline
但是,如果只有一次限制动作,那么信号量就足够了。
@TheUknown - 好消息,我们得到了专家的答复 :) 我的目标不仅仅是限制任务的数量,还要确保整个过程尽快完成,知道写入 Crm 的部分是主要瓶颈。感谢您提供对其他答案的评论参考,它也提供了丰富的信息,适合我的情况。
【参考方案1】:
进程主要有IO操作和一些次要的CPU操作(主要是反序列化字符串)。
这几乎只是 I/O。除非这些字符串巨大,否则反序列化不值得并行化。你正在做的那种 CPU 工作会在噪音中消失。
因此,您需要关注并发异步。
如您所见,SemaphoreSlim
是标准模式。
TPL 数据流还可以执行并发(异步和并行形式)。
ForEachAsync
可以有多种形式;请注意,在您引用的blog post 中,此方法有 5 个不同的实现,每个实现都是有效的。 “[T] 迭代可能有许多不同的语义,每一种都会导致不同的设计选择和实现。”出于您的目的(不希望 CPU 并行化),您不应考虑使用 Task.Run
或分区的那些。在异步并发世界中,任何ForEachAsync
实现都只是语法糖,它隐藏了它实现的语义,这就是我倾向于避免它的原因。
这会让您选择SemaphoreSlim
与ActionBlock
。我通常建议人们首先从SemaphoreSlim
开始,如果他们的需求变得更复杂(在某种程度上他们会从数据流管道中受益),可以考虑迁移到 TPL Dataflow。
例如,“部分要求是使并行度可配置。”
您可以从允许一定程度的并发开始 - 被限制的事情是单个整体操作(从 url 获取数据,将下载的数据反序列化为对象,持久化到 crm 动态和另一个数据库中,然后更新第一个数据库)。这就是SemaphoreSlim
的完美解决方案。
但您可能决定要使用多个旋钮:例如,一个并发度用于下载多少个 url,一个单独的并发度用于持久化,以及一个单独的并发度用于更新原始数据库。然后您还需要限制这些点之间的“队列”:内存中只有这么多的反序列化对象等 - 以确保使用慢速数据库的快速 url 不会导致您的应用程序使用太多问题记忆。如果这些是有用的语义,那么您已经开始从数据流的角度解决问题,这就是您可能会更好地使用像 TPL Dataflow 这样的库。
【讨论】:
非常感谢您的回答,它详细而清晰,您甚至提到了我提到的所有选项,包括 ForEachAsync! +100 :)【参考方案2】:以下是信号量方法的卖点:
-
简单
以下是 TPL 数据流 方法的卖点:
-
数据并行之上的任务并行
资源(带宽、CPU、数据库连接)的最佳利用
每个异构操作的并行度可配置
减少内存占用
让我们以下面的 Semaphore 实现为例:
string[] urls = FetchUrlsFromDB();
var cts = new CancellationTokenSource();
var semaphore = new SemaphoreSlim(10); // Degree of parallelism (DOP)
Task[] tasks = urls.Select(url => Task.Run(async () =>
await semaphore.WaitAsync(cts.Token);
try
string rawData = DownloadData(url);
var data = Deserialize(rawData);
PersistToCRM(data);
MarkAsCompleted(url);
finally
semaphore.Release();
)).ToArray();
Task.WaitAll(tasks);
上述实现可确保在任何给定时刻最多同时处理 10 个 url。但是,这些并行工作流之间不会有协调。因此,例如,完全有可能在给定时刻所有 10 个并行工作流都将下载数据,在另一个时刻所有 10 个将反序列化原始数据,而在另一个时刻所有 10 个将数据持久化到 CRM。这远非理想。 理想情况下,您希望整个操作的瓶颈,无论是网络适配器、CPU 还是数据库服务器,始终不间断地工作,并且不会在各种随机时刻未充分利用(或完全空闲)。
另一个考虑因素是每个异构操作的最佳并行化程度。 10 DOP 可能对于与 Web 的通信是最佳的,但对于与数据库的通信来说太低或太高。信号量方法不允许这种级别的微调。您唯一的选择是通过选择介于这些最优值之间的 DOP 值来进行折衷。
如果 url 的数量非常大,比如说 1,000,000,那么上面的 Semaphore 方法也会引起严重的内存使用问题。一个 url 的平均大小可能为 50 字节,而连接到 CancellationToken
的 Task
可能要重 10 倍或更多。当然,您可以更改实现并以一种不会产生太多任务的更聪明的方式使用SemaphoreSlim
,但这会违背这种方法的主要(也是唯一)卖点,即其简单性。
TPL Dataflow 库解决了所有这些问题,代价是能够驯服这个强大的工具所需的(较小的)学习曲线。
【讨论】:
以上是关于TPL 数据流与普通信号量的主要内容,如果未能解决你的问题,请参考以下文章