当消费者不堪重负时,如何让快速生产者暂停?
Posted
技术标签:
【中文标题】当消费者不堪重负时,如何让快速生产者暂停?【英文标题】:How to make fast producer paused when consumer is overwhelmed? 【发布时间】:2016-12-20 11:35:19 【问题描述】:我在我的应用中实现了使用 TPL 数据流的生产者/消费者模式。我有一个包含大约 40 个块的大数据流网格。网格中有两个主要的功能部分:生产者部分和消费者部分。生产者应该不断地为消费者提供大量工作,而消费者有时会缓慢地处理传入的工作。当消费者忙于某些指定数量的工作项时,我想暂停生产者。否则应用程序会消耗大量内存/CPU 并且行为不可持续。
我制作了演示该问题的演示应用程序:
using System;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace DataflowTest
class Program
static void Main(string[] args)
var options = new ExecutionDataflowBlockOptions
MaxDegreeOfParallelism = 4,
EnsureOrdered = false
;
var boundedOptions = new ExecutionDataflowBlockOptions
MaxDegreeOfParallelism = 4,
EnsureOrdered = false,
BoundedCapacity = 5
;
var bufferBlock = new BufferBlock<int>(boundedOptions);
var producerBlock = new TransformBlock<int, int>(x => x + 1, options);
var broadcastBlock = new BroadcastBlock<int>(x => x, options);
var consumerBlock = new ActionBlock<int>(async x =>
var delay = 1000;
if (x > 10) delay = 5000;
await Task.Delay(delay);
Console.WriteLine(x);
, boundedOptions);
producerBlock.LinkTo(bufferBlock);
bufferBlock.LinkTo(broadcastBlock);
broadcastBlock.LinkTo(producerBlock);
broadcastBlock.LinkTo(consumerBlock);
bufferBlock.Post(1);
consumerBlock.Completion.Wait();
应用程序打印如下内容:
2
1
3
4
5
69055
69053
69054
69057
438028
438040
142303
438079
这意味着生产者不断旋转并将消息推送给消费者。我希望它暂停并等到消费者完成当前部分工作,然后生产者应该继续为消费者提供消息。
我的问题与其他question 类似,但没有得到正确回答。我尝试了该解决方案,但它在这里不起作用,允许生产者向消费者发送消息。同样设置BoundedCapacity
也不起作用。
到目前为止,我认为唯一的解决方案是制作我自己的块来监控目标块队列并根据目标块的队列进行操作。但我希望这对于这个问题来说有点过头了。
【问题讨论】:
您是否考虑过改用Rx
?看看这个答案:***.com/questions/2542764/tpl-vs-reactive-framework
我希望不需要这个,因为在 Dataflow 上花费了很多时间,它非常适合我的需求。
在您的演示中,生产者可以自己生成所有消息,而无需通过广播块接收自己的消息。你的真实代码也是这样吗,还是需要生产者→生产者周期?
@svick 但是如果我将生产者链接到自身,消费者将如何从生产者那里获取数据?真正的生产者周期由一堆块组成,它们实际加载顺序评论页面、解析页面并将 cmets 传递给消费者。
@kseen 我在问广播→制作人链接是否必要,或者是否可以避免。
【参考方案1】:
如果您需要保持生产者→缓冲区→广播周期完整,那么您需要将广播块替换为仍然广播它接收到的消息的其他块,但在其目标之一已满时等待。
只要您在创建该块时知道该块的目标,就可以使用ActionBlock
(复制自another answer of mine 的代码)来构建它:
public static ITargetBlock<T> CreateGuaranteedBroadcastBlock<T>(
DataflowBlockOptions options, params ITargetBlock<T>[] targets)
var block = new ActionBlock<T>(
async item =>
foreach (var target in targets)
await target.SendAsync(item);
, new ExecutionDataflowBlockOptions
BoundedCapacity = options.BoundedCapacity,
CancellationToken = options.CancellationToken
);
block.Completion.ContinueWith(task =>
foreach (var target in targets)
if (task.Exception != null)
target.Fault(task.Exception);
else
target.Complete();
);
return block;
使用这个,你可以声明广播块:
var broadcastBlock = CreateGuaranteedBroadcastBlock(
boundedOptions, producerBlock, consumerBlock);
(您还需要删除与broadcastBlock
链接的LinkTo
行。)
您的原始代码的一个问题是完成,但这是 TPL 数据流中的一个难题,通常有周期。
【讨论】:
关于完成,如果我的网络是连续的怎么办?就像将来没有任何完成一样,它应该在应用程序运行时继续运行。 我刚刚在我的演示应用程序中尝试了这个GuaranteedBroadcastBlock
,它就像一个魅力!完美的!非常感谢。
这是最好的情况:你不需要完成,所以它不起作用。【参考方案2】:
看起来你的生产者生成了一个序列,所以不需要整个生产者→缓冲区→广播循环。相反,所有三个块都可以替换为 async
循环,该循环生成下一个项目,然后使用 await SendAsync()
将其发送给消费者:
Task.Run(async () =>
int i = 1;
while (true)
await consumerBlock.SendAsync(i);
i++;
consumerBlock.Complete();
);
这样,一旦消费者达到其容量,await SendAsync()
将确保生产者等到消费者消费完一个项目。
如果您想将此类生产者封装到数据流块中,以便您可以例如将其链接到消费者you can。
【讨论】:
我真正的“生产者”是一组加载 cmets 页面(包含指向下一个 cmets 页面的链接)的块,解析当前评论页面的内容,将 cmets 发送给消费者并启动这个循环再次将下一个评论页面的地址传递给这个生产者循环中的第一个块。所以,不幸的是,这不仅仅是一个序列。这就像链接序列,其中序列中的每个元素都有到其中下一个元素的地址,而序列中的最后一个元素没有下一个元素的地址。抱歉这个问题太简单了。 我刚刚制作了更好地代表真实情况的图表。给你:imgur.com/iEklfeG以上是关于当消费者不堪重负时,如何让快速生产者暂停?的主要内容,如果未能解决你的问题,请参考以下文章