.Net 任务调度程序行为
Posted
技术标签:
【中文标题】.Net 任务调度程序行为【英文标题】:.Net task scheduler behaviour 【发布时间】:2019-03-08 08:06:16 【问题描述】:我使用 .Net 任务,给出以下代码:
public static async Task TaskSchedulerBehaviour()
var topLevelTasks = Enumerable.Range(0, 5).Select(async n =>
await Task.Delay(50); // THIS LINE MAKES THE DIFFERENCE
var steps = Enumerable.Range(0, 100000);
foreach (var batch in steps.Batch(1000)) /* ".Batch" is contained in MoreLinq */
await Task.WhenAll(batch.Select(async step => await WorkStep(n, step)));
);
await Task.WhenAll(topLevelTasks);
async Task WorkStep(int worker, int step)
if (step % 100 == 0)
Console.WriteLine($"worker=worker, step=step");
await Task.Delay(10);
显示的代码包含一些“大型”***任务,它们完成大量工作(=许多小任务 (WorkStep
);仅调用 Task.Delay
)。
代码中的一行标有注释:如果删除此行,则可能会发生一些***任务排队等待所有其他任务完成的情况。如果其他“***”任务非常密集,他们似乎会挨饿。
另一方面,如果我添加注释行,则行为会好得多:似乎所有***任务执行其子任务的时间或多或少相似。它们同时运行。
为什么会这样?任务调度器不是一个简单的 FIFO 队列或类似的东西吗?
非常感谢
【问题讨论】:
通过添加这一行,您可以使方法的其余部分成为一个延续,如果没有这一行,任务将开始执行直到它返回之前的第一个等待。 “***任务”的所有子任务都使用Task.Delay
,因此会阻塞***任务。
问题出在代码本身。任务调度器调度任务。这不是队列。你不能像队列一样使用它。任务也不是消息。如果您想要 ETL 样式的处理,请使用 ActionBlock 等 TPL 数据流类。这些块包含它们自己的输入/输出缓冲区。它们可以组合成处理管道,每个块都在自己的任务上运行。 BatchBlock 可以批量传入消息。也可以通过设置 MaxDegreeOfParallelism 设置来并行处理多条消息
假设 真正的“大”任务生成许多较小的任务,您可以使用 TransformManyBlock 接收“大”输入并产生多个输出,这些输出将发送到后续处理块
另一种选择是只使用 PLINQ。 PLINQ 和 Parallel.ForEach 都对数据进行分区并将每个批次提供给单独的任务进行处理。这样,每个 CPU 内核都可以 100% 处理数据,而不是尝试同步访问公共队列
【参考方案1】:
假设您正在谈论线程池任务调度程序,它是许多可能的任务调度程序之一......
为什么会这样?任务调度器不是一个简单的 FIFO 队列或类似的东西吗?
There is one shared queue that is generally (not strictly) FIFO, plus each thread pool thread has its own local queue that is generally (not strictly) LIFO。线程池线程如果无事可做,可以从其他线程的本地队列中窃取。
此外,任务调度程序仅用于执行同步代码。 async
/await
的概念是任务调度器之上的一个抽象层次。因此,通过添加await Task.Delay
,您的代码实际上是将一个概念上的async
任务拆分为多个部分,每个部分在适当的时间排队到线程池中。即,第一部分立即排队;当它运行时,它调用Task.Delay
(启动计时器)然后点击await
,导致该部分退出;当计时器关闭时,第二部分立即排队。
对于真实世界的代码,如 Panagiotis 在 cmets 中所述,请考虑使用 TPL Dataflow 进行排队工作。
【讨论】:
我明白了,使用 LIFO 是因为缓存:)!听起来很合理。谢谢你的链接!【参考方案2】:这不是您问题的直接答案。我只是提供一个比直接使用任务更好的替代方案。
您应该使用 Microsoft 的响应式框架(又名 Rx)- NuGet System.Reactive
并添加 using System.Reactive.Linq;
- 然后您可以这样做:
public static async Task TaskSchedulerBehaviour()
var topLevelTasks =
from n in Observable.Range(0, 5)
from batch in Observable.Range(0, 100000).Buffer(1000)
from results in
from step in batch.ToObservable()
from result in Observable.FromAsync(() => WorkStep(n, step))
select result
select results;
await topLevelTasks.ToArray();
async Task WorkStep(int worker, int step)
if (step % 100 == 0)
Console.WriteLine($"worker=worker, step=step");
await Task.Delay(10);
Rx 可以很好地为您处理所有日程安排。
你不得不承认代码看起来也好多了。
【讨论】:
以上是关于.Net 任务调度程序行为的主要内容,如果未能解决你的问题,请参考以下文章
windows任务调度程序和hangfire(或Quartz.net)有啥区别? [关闭]
C#/.NET/.NET Core应用程序编程中实现定时任务调度的方法或者组件有哪些,Timer,FluentScheduler,TaskScheduler,Gofer.NET,Coravel,Qua