使用 Task.Run() 时的延迟

Posted

技术标签:

【中文标题】使用 Task.Run() 时的延迟【英文标题】:Delay when using Task.Run() 【发布时间】:2021-12-10 20:42:56 【问题描述】:

我正在通过 ZMQ 读取数据并根据主题在单独的线程中执行任务。我注意到当数据频率非常高时(每 1ms 约 1 条消息),一些任务需要很长时间才能执行。

这基本上就是我正在做的事情:

while(true)
  item = zmqSubscriber.ReceiveData(out topic, out ConsumeErrorMsg);
  if (topic.Equals(topic1))
     
      Task.Run(() => ExecuteTask1(item));
     
     else if (topic.Equals(topic2)
     
      Task.Run(() => ExecuteTask2(item));
     
     else if (topic.Equals(topic3))
     
      Task.Run(() => ExecuteTask3(item));

     

当数据频率稍低(每 100 毫秒 10 条消息)时,我没有注意到任何行为问题。

我是 C# 新手,我想知道这是否可能是由于活动线程池线程的最大数量太少。我在这里读到这个数字可以增加,但是,这不是一个好习惯:ThreadPool.SetMinThreads(Int32, Int32) Method

所以我想知道是否有更好的方法来实现我想要做的事情??

【问题讨论】:

您可以使用 3 个并发队列(或通道)作为消费者服务 3 个单独的任务。如果需要并发处理同类型的item,每个队列可以使用多个Task... 如果处理这些项目(在ExecuteTask 调用中)涉及一些IO,那么您可能会通过使用异步处理获得一些好处。但即使它是纯粹的 CPU 工作——以这样的频率为每个项目运行单独的任务可能不是一个好主意——无论如何你只有这么多的 CPU 内核。 ExecuteTask1ExecuteTask2ExecuteTask3 方法在做什么? 您使用的是什么库(例如 Nuget 等)? 在竞争和死锁之后,这是第三个最常见的线程错误,即 firehose 错误。为工作线程产生的工作量超出了它们的处理能力。它最终会使程序在 OutOfMemoryException 上崩溃。需要很长时间,现代机器有很多。然而,在 Debug > Windows > Threads 调试器窗口中很容易观察到线程爆炸。并且该程序的响应能力不那么出色。需要进行节流,可以像计算繁忙工作人员的 SemaphoreSlim 一样简单。 【参考方案1】:

我个人会使用 DataFlowRx 来帮助分区、排队和管理您的工作负载,而不是启动无休止的任务并破坏任务调度程序。

它们都满足同步和异步操作,可以采用取消令牌,管理并行度,并在需要时为您提供背压。您也可以将其推送到其他管道。

var options = new ExecutionDataflowBlockOptions()

   //BoundedCapacity = <= set this if you want back pressure
   //CancellationToken = token <= set this if you like cancelling stuff
   //MaxDegreeOfParallelism = <= set this if you want limited parallelism
   SingleProducerConstrained = true
;

// This could all be done in the one action block,
// or different options for each block depending on your needs
var action1 = new ActionBlock<Message>(ExecuteTask1,options);
var action2 = new ActionBlock<Message>(ExecuteTask2,options);
var action3 = new ActionBlock<Message>(ExecuteTask3,options);

while (true)

   var item = zmqSubscriber.ReceiveData(out topic, out ConsumeErrorMsg);
   topic switch
   
      topic1 =>  await action1.SendAsync(ConsumeErrorMsg,token),
      topic2 =>  await action2.SendAsync(ConsumeErrorMsg,token),
      topic3 =>  await action3.SendAsync(ConsumeErrorMsg,token),  
   ;

免责声明:这不是关于 DataFlow 的教程,您需要研究这项技术、审查和调整任何此类解决方案以满足您的需求。

如果您的消息超出您的处理速度,您还应该实施一些限制策略。

【讨论】:

以上是关于使用 Task.Run() 时的延迟的主要内容,如果未能解决你的问题,请参考以下文章

关于 Task.Start() 、 Task.Run() 和 Task.Factory.StartNew() 的使用

Task.Run 和 Task.Factory.StartNew 区别

我应该使用 Task.Run 还是 Task.FromResult?

使用 Task.Run() 时如何限制最大线程数?

Task.Run使用默认线程池

使用 Task.Run() 时如何避免 OutOfMemoryException? [复制]