使用 Task.Run() 时的延迟
Posted
技术标签:
【中文标题】使用 Task.Run() 时的延迟【英文标题】:Delay when using Task.Run() 【发布时间】:2021-12-10 20:42:56 【问题描述】:我正在通过 ZMQ 读取数据并根据主题在单独的线程中执行任务。我注意到当数据频率非常高时(每 1ms 约 1 条消息),一些任务需要很长时间才能执行。
这基本上就是我正在做的事情:
while(true)
item = zmqSubscriber.ReceiveData(out topic, out ConsumeErrorMsg);
if (topic.Equals(topic1))
Task.Run(() => ExecuteTask1(item));
else if (topic.Equals(topic2)
Task.Run(() => ExecuteTask2(item));
else if (topic.Equals(topic3))
Task.Run(() => ExecuteTask3(item));
当数据频率稍低(每 100 毫秒 10 条消息)时,我没有注意到任何行为问题。
我是 C# 新手,我想知道这是否可能是由于活动线程池线程的最大数量太少。我在这里读到这个数字可以增加,但是,这不是一个好习惯:ThreadPool.SetMinThreads(Int32, Int32) Method
所以我想知道是否有更好的方法来实现我想要做的事情??
【问题讨论】:
您可以使用 3 个并发队列(或通道)作为消费者服务 3 个单独的任务。如果需要并发处理同类型的item,每个队列可以使用多个Task... 如果处理这些项目(在ExecuteTask
调用中)涉及一些IO,那么您可能会通过使用异步处理获得一些好处。但即使它是纯粹的 CPU 工作——以这样的频率为每个项目运行单独的任务可能不是一个好主意——无论如何你只有这么多的 CPU 内核。
ExecuteTask1
、ExecuteTask2
和 ExecuteTask3
方法在做什么?
您使用的是什么库(例如 Nuget 等)?
在竞争和死锁之后,这是第三个最常见的线程错误,即 firehose 错误。为工作线程产生的工作量超出了它们的处理能力。它最终会使程序在 OutOfMemoryException 上崩溃。需要很长时间,现代机器有很多。然而,在 Debug > Windows > Threads 调试器窗口中很容易观察到线程爆炸。并且该程序的响应能力不那么出色。需要进行节流,可以像计算繁忙工作人员的 SemaphoreSlim 一样简单。
【参考方案1】:
我个人会使用 DataFlow 或 Rx 来帮助分区、排队和管理您的工作负载,而不是启动无休止的任务并破坏任务调度程序。
它们都满足同步和异步操作,可以采用取消令牌,管理并行度,并在需要时为您提供背压。您也可以将其推送到其他管道。
var options = new ExecutionDataflowBlockOptions()
//BoundedCapacity = <= set this if you want back pressure
//CancellationToken = token <= set this if you like cancelling stuff
//MaxDegreeOfParallelism = <= set this if you want limited parallelism
SingleProducerConstrained = true
;
// This could all be done in the one action block,
// or different options for each block depending on your needs
var action1 = new ActionBlock<Message>(ExecuteTask1,options);
var action2 = new ActionBlock<Message>(ExecuteTask2,options);
var action3 = new ActionBlock<Message>(ExecuteTask3,options);
while (true)
var item = zmqSubscriber.ReceiveData(out topic, out ConsumeErrorMsg);
topic switch
topic1 => await action1.SendAsync(ConsumeErrorMsg,token),
topic2 => await action2.SendAsync(ConsumeErrorMsg,token),
topic3 => await action3.SendAsync(ConsumeErrorMsg,token),
;
免责声明:这不是关于 DataFlow 的教程,您需要研究这项技术、审查和调整任何此类解决方案以满足您的需求。
如果您的消息超出您的处理速度,您还应该实施一些限制策略。
【讨论】:
以上是关于使用 Task.Run() 时的延迟的主要内容,如果未能解决你的问题,请参考以下文章
关于 Task.Start() 、 Task.Run() 和 Task.Factory.StartNew() 的使用
Task.Run 和 Task.Factory.StartNew 区别