在实现生产者/消费者模式时使用 Task.Yield 克服 ThreadPool 饥饿

Posted

技术标签:

【中文标题】在实现生产者/消费者模式时使用 Task.Yield 克服 ThreadPool 饥饿【英文标题】:Using Task.Yield to overcome ThreadPool starvation while implementing producer/consumer pattern 【发布时间】:2019-04-15 05:43:44 【问题描述】:

回答问题:Task.Yield - real usages? 我建议使用 Task.Yield 允许池线程被其他任务重用。在这样的模式下:

  CancellationTokenSource cts;
  void Start()
  
        cts = new CancellationTokenSource();

        // run async operation
        var task = Task.Run(() => SomeWork(cts.Token), cts.Token);
        // wait for completion
        // after the completion handle the result/ cancellation/ errors
    

    async Task<int> SomeWork(CancellationToken cancellationToken)
    
        int result = 0;

        bool loopAgain = true;
        while (loopAgain)
        
            // do something ... means a substantial work or a micro batch here - not processing a single byte

            loopAgain = /* check for loop end && */  cancellationToken.IsCancellationRequested;
            if (loopAgain) 
                // reschedule  the task to the threadpool and free this thread for other waiting tasks
                await Task.Yield();
            
        
        cancellationToken.ThrowIfCancellationRequested();
        return result;
    

    void Cancel()
    
        // request cancelation
        cts.Cancel();
    

但一位用户写道

我不认为使用 Task.Yield 来克服 ThreadPool 饥饿,而 实施生产者/消费者模式是个好主意。我建议你 如果您想详细说明原因,请提出一个单独的问题。

任何人都知道,为什么不是一个好主意?

【问题讨论】:

我对原始评论者的动机没有确定的想法,但您应该尽量避免等待数据到达的繁忙循环,而是应该使用允许您触发处理的机制。 我认为热循环是不好的有或没有将异步添加到混合中 - 如果它是 await Task.Delay(50) 或其他东西,我会原谅它更多,但是:使用异步激活而不是这样检查会更好;有新的“通道”API,例如 (nuget.org/packages/System.Threading.Channels) - 设计 用于异步生产者/消费者场景 @MaximT 确实 - 这是我在 SE.Redis 中用于有序消息队列的东西:github.com/StackExchange/StackExchange.Redis/blob/master/src/… 这与线程池管理器试图做的事情完全相反。它努力将活动 tp 线程的数量限制在理想数量,以减少上下文切换开销。当您使用 Task.Yield 时,您会添加上下文切换开销。如果您有太多无法有效执行代码的 tp 线程(阻塞太多),请使用 SetMinThreads()。 嗯,当然它必须这样工作。再多的负担得起的钱也不会为您购买具有一千个处理器内核的机器。你不能用这么多的工作来抨击线程池并期待即时的魔法。这些是属于问题的重要细节。 【参考方案1】:

对于您的问题,cmets 中还有一些优点。作为您引用的用户,我只想总结一下:为工作使用正确的工具。

使用ThreadPool 感觉不是执行多个连续 CPU 密集型任务的正确工具,即使您尝试通过将它们转换为状态机来组织一些协作执行,从而使用await Task.Yield() 相互产生 CPU 时间.线程切换相当昂贵;通过在紧密循环上执行await Task.Yield(),您会增加大量开销。此外,您永远不应该接管整个 ThreadPool,因为 .NET 框架(和底层操作系统进程)可能需要它来处理其他事情。在相关的说明中,TPL 甚至具有 TaskCreationOptions.LongRunning 选项,该选项请求不在 ThreadPool 线程上运行任务(相反,它在幕后创建带有 new Thread() 的普通线程)。

也就是说,使用 custom TaskScheduler 并在一些专用的池外线程上使用有限的并行度,这些线程对单个长时间运行的任务具有线程亲和力可能是不同的东西。至少,await 延续将发布在同一个线程上,这应该有助于减少切换开销。这让我想起了我不久前尝试用ThreadAffinityTaskScheduler 解决的另一个问题。

不过,根据特定情况,使用现有的成熟且经过测试的工具通常会更好。仅举几例:Parallel Class、TPL Dataflow、System.Threading.Channels、Reactive Extensions。

还有一整***有的工业级解决方案来处理发布-订阅模式(RabbitMQ、PubNub、Redis、Azure 服务总线、Firebase 云消息传递 (FCM)、亚马逊简单队列服务 (SQS) 等)。

【讨论】:

我知道所有完善的解决方案 - 我自己使用了其中的一些。在性能方面,Kafka 是这个范围内最好的,NATS 也是。但是性能提升通常是以牺牲可靠性为代价的。对于可靠的消息处理,需要从持久存储中读取它们并且不要将它们缓冲在内存中。而且这些任务通常不是处理单个字节那么简单,而是通常需要几毫秒。我通常将 WorkStealingTaskScheduler 用于长时间的 CPU 绑定任务(它有一个自定义线程池)。所以这一切都取决于它使用的上下文。 答案中的关键 - 不要在紧密循环中使用 Task.Yield。我同意 100%。但是,如果处理每次迭代所花费的时间超过额外的 ThreadPool.QueueUserWorkItem,那么性能下降可以忽略不计,但会增加响应能力和任务协作。顺便说一句 - 用一个小的自定义设置很容易测试。在一个紧密的循环中,减少了大约 100%,但如果在每次迭代中完成了一些工作(大约几毫秒) - 那么减少量小于 10%。 @MaximT,无论如何我都不会用一百万个小型计算任务来超载默认的ThreadPool。但是假设您有一个自定义池,例如您使用WorkStealingTaskScheduler 创建了一个。看看实际的基准会很有趣。例如,有 10000 个任务,每个任务计算 Pi 号的前 10000 位。然后将其与ThreadPoolTaskScheduler 进行比较(确保使用SetMinThreads/SetMaxThreads 固定线程数)。然后将其与具有实际线程亲和性的任务调度程序进行比较(AFAIR,WorkStealingTaskScheduler 不适用于 await 延续)。 我的经验是性能杀手不是上下文切换,而是上下文切换。如果计算 PI 编号的工作很短,那么将 10000 次迭代批处理到一个 ThreadPool.QueueUserWorkItem 将解决问题。 WorkStealingTaskScheduler 用于较长的 CPU 绑定同步任务,其中没有异步等待。看看github.com/BBGONE/REBUS-TaskCoordinator/blob/master/Showdown/… 混合作业是如何拆分成子任务的(HandleLongRunMessage 方法)。 [上下文切换的价格] = [上下文切换时长] / ([作业时长]+[上下文切换时长])。工作越短,价格就越高。对于太长的工作 - 线程池无论如何都不是解决方案。 ThreadAffinityTaskScheduler 的缺点是它不能移植到 NET.Core - 它依赖于平台。这个问题可以通过微批处理来解决——批处理将在单个线程上处理。【参考方案2】:

在与其他用户就这个问题进行了一番辩论后——他们担心上下文切换及其对性能的影响。 我明白他们在担心什么。

但我的意思是:在循环内做一些事情...... 是一项实质性任务 - 通常以消息处理程序的形式从队列中读取消息并处理它。消息处理程序通常是用户定义的,消息总线使用某种调度程序执行它们。用户可以实现一个同步执行的处理程序(没有人知道用户会做什么),并且没有 Task.Yield 会阻塞线程以循环处理这些同步任务。

不要说空话,我在 github 上添加了测试:https://github.com/BBGONE/TestThreadAffinity 他们将 ThreadAffinityTaskScheduler、.NET ThreadScheduler 与 BlockingCollection 以及 .NET ThreadScheduler 与 Threading.Channels 进行比较。

测试表明,对于超短作业,性能下降是 约 15%。要使用 Task.Yield 而不会降低性能(即使很小) - 不要使用极短的任务,如果任务太短,则将较短的任务组合成更大的批次。

[上下文切换的价格] = [上下文切换时长] / ([作业时长]+[上下文切换时长])

在这种情况下,切换任务对性能的影响可以忽略不计。但它增加了系统更好的任务协作和响应能力。

对于长时间运行的任务,最好使用自定义调度程序,它在自己的专用线程池上执行任务 - (如 WorkStealingTaskScheduler)。

对于混合作业 - 可以包含不同的部分 - 短期运行的 CPU 限制、异步和长时间运行的代码部分。最好将任务拆分为子任务。

private async Task HandleLongRunMessage(TestMessage message, CancellationToken token = default(CancellationToken))
 
            // SHORT SYNCHRONOUS TASK - execute as is on the default thread (from thread pool)
            CPU_TASK(message, 50);
            // IO BOUND ASYNCH TASK - used as is
            await Task.Delay(50);
            // BUT WRAP the LONG SYNCHRONOUS TASK inside the Task 
            // which is scheduled on the custom thread pool 
            // (to save threadpool threads)
            await Task.Factory.StartNew(() => 
                CPU_TASK(message, 100000);
            , token, TaskCreationOptions.DenyChildAttach, _workStealingTaskScheduler);

【讨论】:

频道信息:github.com/stephentoub/corefxlab/blob/master/src/… 看来我想通了。尽管在所有用法中性能相同,但通道具有非阻塞等待何时可以写入通道的好处。这发生在有界场景中。 BlockingCollection 阻塞线程 - 不允许写入,Channel 将线程留给其他人使用。 我将 Threading.Channels 的测试移植到 CoreFX(而不是完整的 Net Framework)——它开始工作的速度提高了 2.5 倍。现在我的电脑上每秒有超过 100 万条消息。我将此解决方案添加到测试中。他们真的很好。

以上是关于在实现生产者/消费者模式时使用 Task.Yield 克服 ThreadPool 饥饿的主要内容,如果未能解决你的问题,请参考以下文章

一、多线程下生产者消费者模式

python实现生产者和消费者模式

用阻塞队列实现生产者消费者模式一(单线程消费)

异步简析之BlockingCollection实现生产消费模式

使用管程实现生产者消费者模式

java 多线程并发系列之 生产者消费者模式的两种实现