不遵守链接的 ActionBlock 的 BoundedCapacity
Posted
技术标签:
【中文标题】不遵守链接的 ActionBlock 的 BoundedCapacity【英文标题】:BoundedCapacity of linked ActionBlock is not respected 【发布时间】:2021-09-20 14:01:09 【问题描述】:我有一个包含两个步骤的顺序管道。
(简化示例)
第一步只是将输入数字加 1000。 第二步只是显示数字。
var transformBlock = new TransformBlock<int, long>(StepOne, new ExecutionDataflowBlockOptions
MaxDegreeOfParallelism = 1,
BoundedCapacity = DataflowBlockOptions.Unbounded,
);
var actionBlock = new ActionBlock<long>(StepTwo, new ExecutionDataflowBlockOptions
MaxDegreeOfParallelism = 1,
BoundedCapacity = 2,
);
transformBlock.LinkTo(actionBlock, new DataflowLinkOptions
PropagateCompletion = true
);
for (int i = 0; i < 100; i++)
transformBlock.Post(i);
static async Task<long> StepOne(int item)
await Task.Delay(500);
Console.WriteLine("transforming: " + item);
return (long)item + 1000;
static async Task StepTwo(long item)
await Task.Delay(1000);
Console.WriteLine("final product: " + item);
由于第 2 步花费的时间比第 1 步长,我预计第 1 步会在一段时间后进行节流,因为它无法将结果发送到第 2 步的有界缓冲区。
预期输出: 转型:0 转型:1 最终产品:1000 转型:2 最终产品:1001 转型:3 最终产品:1002 转型:4 最终产品:1003 ...
实际输出: 转型:0 转型:1 最终产品:1000 转型:2 转型:3 最终产品:1001 转型:4 转型:5 最终产品:1002 转型:6 转型:7 最终产品:1003 ...
【问题讨论】:
【参考方案1】:TransformBlock
在内部维护两个队列,一个输入队列和一个输出队列。这两个队列的大小可以通过InputCount
和OutputCount
属性随时监控。这两个队列的累加大小由BoundedCapacity
选项配置,所以总和InputCount
+OutputCount
总是小于等于BoundedCapacity
的值。在您的情况下,块的BoundedCapacity
是Unbounded
,因此对于这两个队列可以变得多大没有限制因素(可能除了像Int32.MaxValue
这样的一些硬限制)。链接的ActionBlock
具有有限的有界容量这一事实几乎是无关紧要的,除了延迟将转换后的值从TransformBlock
的输出队列传输到ActionBlock
的输入队列之外,没有任何后果。仅当您监视源块的OutputCount
属性和目标块的InputCount
属性时,才能观察到此结果。 TransformBlock
是否没有链接到任何目标块甚至都没有关系。它会很高兴地继续自己处理数字,直到达到某个硬限制,或者机器的内存耗尽。
【讨论】:
这很有意义。您是否知道我必须链接哪些块才能获得所需的输出? @user3357878 我的理解是,只有使用BoundedCapacity
配置管道中的所有块才有意义,或者没有。如果所有的块都是有界的,那么可能会引入backpreasure,这意味着管道的馈线可能会被阻塞。 transformBlock.Post(i);
不再保证返回true
,这通常意味着您必须切换到异步await transformBlock.SendAsync(i);
。
将每个块限制为相同的容量并使用 SendAsync(为了不丢弃物品)确实可以解决这个问题。但是想象一下,我的第一步将一些轻量级项目(例如 ID)排入队列,而第二步则下载图像。这意味着我不会关心由第一步中排队的数千个整数引起的背压,并使其成为一个合理的用例?
@user3357878 是的,这当然是一个有趣的用例。本质上,您需要一个TransformBlock
,并为其每个输入和输出队列配置一个单独的BoundedCapacity
。 AFAIK 没有这样的内置块,但您可以考虑在管道的开头添加一个BufferBlock
,容量不受限制,并限制下游所有链接块的容量。以上是关于不遵守链接的 ActionBlock 的 BoundedCapacity的主要内容,如果未能解决你的问题,请参考以下文章
如何发布到 BufferBlock 并从 ActionBlock 中获取结果?
TPL Dataflow:ActionBlock,避免在每次调用其委托时重复运行 using-block(例如用于写入 StreamWriter)