意外行为 - TPL DataFlow BatchBlock 在 TriggerBatch 执行时拒绝项目

Posted

技术标签:

【中文标题】意外行为 - TPL DataFlow BatchBlock 在 TriggerBatch 执行时拒绝项目【英文标题】:Unexpected Behaviour - TPL DataFlow BatchBlock Rejects items while TriggerBatch is executing 【发布时间】:2016-06-08 04:57:58 【问题描述】:

当您创建具有有限容量的批处理块并在(并行)发布新项目时调用 triggerBatch - 在触发器批处理执行期间发布新项目将失败。

调用触发器批处理(每X次)是为了确保数据在块中不会延迟太久,以防传入数据流暂停或减慢。

以下代码将输出一些“失败后”事件。 例如:

    public static void Main(string[] args)
    
        var batchBlock = new BatchBlock<int>(10, new GroupingDataflowBlockOptions()  BoundedCapacity = 10000000 );
        var actionBlock = new ActionBlock<int[]>(x => ProcessBatch(x), new ExecutionDataflowBlockOptions()  MaxDegreeOfParallelism = 1 );
        batchBlock.LinkTo(actionBlock);

        var producerTask = Task.Factory.StartNew(() =>
        
            //Post 10K Items
            for (int i = 0; i < 10000; i++)
            
                var postResult = batchBlock.Post(i);
                if (!postResult)
                    Console.WriteLine("Failed to Post");
            
        );

        var triggerBatchTask = Task.Factory.StartNew(() =>
                                
                //Trigger Batch..
                for (int i = 0; i < 1000000; i++)
                    batchBlock.TriggerBatch();
            );

        producerTask.Wait();
        triggerBatchTask.Wait();
    

    public static void ProcessBatch(int[] batch)
    
        Console.WriteLine("0 - 1", batch.First(), batch.Last());
    

*请注意,这种情况只有在 batchBlock 有界时才能重现。

是我遗漏了什么还是 batchBlock 有问题?

【问题讨论】:

有些相关:How to call TriggerBatch automagically after a timeout if the number of queued items is less than the BatchSize? 【参考方案1】:

BatchBlock 并没有真正拒绝该项目,它试图推迟它。除了在Post() 的情况下,推迟不是一个选项。解决此问题的一种简单方法是使用await batchBlock.SendAsync(i) 而不是batchBlock.Post(i)(这也意味着您需要将Task.Factory.StartNew(() =&gt; 更改为Task.Run(async () =&gt;)。

为什么会这样?根据the source code,如果BatchBlock 是有界的,TriggerBatch() 将被异步处理,并且在处理过程中,不会接受任何新项目。

在任何情况下,您都不应该期望Post() 在有界块上总是返回true,如果块已满,Post() 也将返回false

【讨论】:

同时我正在使用不同的解决方案,通过引入另一个将接受失败的块,最终我在两个块上以串行方式调用 triggerbatch。根据您的建议解决方案 - await 和 async 将创建一个任务来处理每个传入项目,当您有大量事件时,这可能会导致内存不足问题,许多任务将被无限创建。 @AlYaros 不,不会。如果该项目被接受,你会得到一个缓存的Task,所以那里没有分配。如果项目被推迟,您显示的代码在被接受之前不会添加新项目。如果在您的实际代码中 await 会导致问题,那么 IMO 要么您应该能够修复它们,否则即使没有它您也会遇到问题。 顺便说一句,感谢您的 cmets :) 我不确定这在任务内存消耗方面是否绝对安全。我将查看您建议的源代码并进行测试有点。任务不是在block code执行之前创建的,不管post结果如何? 我也希望 Post 会被接受,如果没有达到限制容量,不管 TriggerBatch 处理。

以上是关于意外行为 - TPL DataFlow BatchBlock 在 TriggerBatch 执行时拒绝项目的主要内容,如果未能解决你的问题,请参考以下文章

使用 TPL-Dataflow 进行聚合和连接(内、外、左……)?

TPL-Dataflow 是不是适用于高并发应用程序?

TPL Dataflow BufferBlock 线程安全吗?

TPL Dataflow 如何与“全局”数据同步

TPL Dataflow ,完成一个 Block ,重新创建一个 BLock

TPL DataFlow 一对一处理