意外行为 - TPL DataFlow BatchBlock 在 TriggerBatch 执行时拒绝项目
Posted
技术标签:
【中文标题】意外行为 - TPL DataFlow BatchBlock 在 TriggerBatch 执行时拒绝项目【英文标题】:Unexpected Behaviour - TPL DataFlow BatchBlock Rejects items while TriggerBatch is executing 【发布时间】:2016-06-08 04:57:58 【问题描述】:当您创建具有有限容量的批处理块并在(并行)发布新项目时调用 triggerBatch - 在触发器批处理执行期间发布新项目将失败。
调用触发器批处理(每X次)是为了确保数据在块中不会延迟太久,以防传入数据流暂停或减慢。
以下代码将输出一些“失败后”事件。 例如:
public static void Main(string[] args)
var batchBlock = new BatchBlock<int>(10, new GroupingDataflowBlockOptions() BoundedCapacity = 10000000 );
var actionBlock = new ActionBlock<int[]>(x => ProcessBatch(x), new ExecutionDataflowBlockOptions() MaxDegreeOfParallelism = 1 );
batchBlock.LinkTo(actionBlock);
var producerTask = Task.Factory.StartNew(() =>
//Post 10K Items
for (int i = 0; i < 10000; i++)
var postResult = batchBlock.Post(i);
if (!postResult)
Console.WriteLine("Failed to Post");
);
var triggerBatchTask = Task.Factory.StartNew(() =>
//Trigger Batch..
for (int i = 0; i < 1000000; i++)
batchBlock.TriggerBatch();
);
producerTask.Wait();
triggerBatchTask.Wait();
public static void ProcessBatch(int[] batch)
Console.WriteLine("0 - 1", batch.First(), batch.Last());
*请注意,这种情况只有在 batchBlock 有界时才能重现。
是我遗漏了什么还是 batchBlock 有问题?
【问题讨论】:
有些相关:How to call TriggerBatch automagically after a timeout if the number of queued items is less than the BatchSize? 【参考方案1】:BatchBlock
并没有真正拒绝该项目,它试图推迟它。除了在Post()
的情况下,推迟不是一个选项。解决此问题的一种简单方法是使用await batchBlock.SendAsync(i)
而不是batchBlock.Post(i)
(这也意味着您需要将Task.Factory.StartNew(() =>
更改为Task.Run(async () =>
)。
为什么会这样?根据the source code,如果BatchBlock
是有界的,TriggerBatch()
将被异步处理,并且在处理过程中,不会接受任何新项目。
在任何情况下,您都不应该期望Post()
在有界块上总是返回true
,如果块已满,Post()
也将返回false
。
【讨论】:
同时我正在使用不同的解决方案,通过引入另一个将接受失败的块,最终我在两个块上以串行方式调用 triggerbatch。根据您的建议解决方案 - await 和 async 将创建一个任务来处理每个传入项目,当您有大量事件时,这可能会导致内存不足问题,许多任务将被无限创建。 @AlYaros 不,不会。如果该项目被接受,你会得到一个缓存的Task
,所以那里没有分配。如果项目被推迟,您显示的代码在被接受之前不会添加新项目。如果在您的实际代码中 await
会导致问题,那么 IMO 要么您应该能够修复它们,否则即使没有它您也会遇到问题。
顺便说一句,感谢您的 cmets :) 我不确定这在任务内存消耗方面是否绝对安全。我将查看您建议的源代码并进行测试有点。任务不是在block code执行之前创建的,不管post结果如何?
我也希望 Post 会被接受,如果没有达到限制容量,不管 TriggerBatch 处理。以上是关于意外行为 - TPL DataFlow BatchBlock 在 TriggerBatch 执行时拒绝项目的主要内容,如果未能解决你的问题,请参考以下文章
使用 TPL-Dataflow 进行聚合和连接(内、外、左……)?
TPL Dataflow BufferBlock 线程安全吗?