不遵守链接的 ActionBlock 的 BoundedCapacity

Posted

技术标签:

【中文标题】不遵守链接的 ActionBlock 的 BoundedCapacity【英文标题】:BoundedCapacity of linked ActionBlock is not respected 【发布时间】:2021-09-20 14:01:09 【问题描述】:

我有一个包含两个步骤的顺序管道。

(简化示例)

第一步只是将输入数字加 1000。 第二步只是显示数字。

var transformBlock = new TransformBlock<int, long>(StepOne, new ExecutionDataflowBlockOptions

      MaxDegreeOfParallelism = 1,
      BoundedCapacity = DataflowBlockOptions.Unbounded,
);
var actionBlock = new ActionBlock<long>(StepTwo, new ExecutionDataflowBlockOptions

      MaxDegreeOfParallelism = 1,
      BoundedCapacity = 2,
);
transformBlock.LinkTo(actionBlock, new DataflowLinkOptions

    PropagateCompletion = true
);

for (int i = 0; i < 100; i++)

    transformBlock.Post(i);


static async Task<long> StepOne(int item)

    await Task.Delay(500);
    Console.WriteLine("transforming: " + item);
    return (long)item + 1000;


static async Task StepTwo(long item)

    await Task.Delay(1000);
    Console.WriteLine("final product: " + item);

由于第 2 步花费的时间比第 1 步长,我预计第 1 步会在一段时间后进行节流,因为它无法将结果发送到第 2 步的有界缓冲区。

预期输出: 转型:0 转型:1 最终产品:1000 转型:2 最终产品:1001 转型:3 最终产品:1002 转型:4 最终产品:1003 ...

实际输出: 转型:0 转型:1 最终产品:1000 转型:2 转型:3 最终产品:1001 转型:4 转型:5 最终产品:1002 转型:6 转型:7 最终产品:1003 ...

【问题讨论】:

【参考方案1】:

TransformBlock 在内部维护两个队列,一个输入队列和一个输出队列。这两个队列的大小可以通过InputCountOutputCount 属性随时监控。这两个队列的累加大小由BoundedCapacity选项配置,所以总和InputCount+OutputCount总是小于等于BoundedCapacity的值。在您的情况下,块的BoundedCapacityUnbounded,因此对于这两个队列可以变得多大没有限制因素(可能除了像Int32.MaxValue 这样的一些硬限制)。链接的ActionBlock 具有有限的有界容量这一事实几乎是无关紧要的,除了延迟将转换后的值从TransformBlock 的输出队列传输到ActionBlock 的输入队列之外,没有任何后果。仅当您监视源块的OutputCount 属性和目标块的InputCount 属性时,才能观察到此结果。 TransformBlock 是否没有链接到任何目标块甚至都没有关系。它会很高兴地继续自己处理数字,直到达到某个硬限制,或者机器的内存耗尽。

【讨论】:

这很有意义。您是否知道我必须链接哪些块才能获得所需的输出? @user3357878 我的理解是,只有使用BoundedCapacity 配置管道中的所有块才有意义,或者没有。如果所有的块都是有界的,那么可能会引入backpreasure,这意味着管道的馈线可能会被阻塞。 transformBlock.Post(i); 不再保证返回true,这通常意味着您必须切换到异步await transformBlock.SendAsync(i); 将每个块限制为相同的容量并使用 SendAsync(为了不丢弃物品)确实可以解决这个问题。但是想象一下,我的第一步将一些轻量级项目(例如 ID)排入队列,而第二步则下载图像。这意味着我不会关心由第一步中排队的数千个整数引起的背压,并使其成为一个合理的用例? @user3357878 是的,这当然是一个有趣的用例。本质上,您需要一个TransformBlock,并为其每个输入和输出队列配置一个单独的BoundedCapacity。 AFAIK 没有这样的内置块,但您可以考虑在管道的开头添加一个BufferBlock,容量不受限制,并限制下游所有链接块的容量。

以上是关于不遵守链接的 ActionBlock 的 BoundedCapacity的主要内容,如果未能解决你的问题,请参考以下文章

如果我没有完成一个 ActionBlock 会不会有问题

如何发布到 BufferBlock 并从 ActionBlock 中获取结果?

TPL Dataflow:ActionBlock,避免在每次调用其委托时重复运行 using-block(例如用于写入 StreamWriter)

UITableViewCell 子视图不遵守自动布局约束

具有延迟的 TPL 数据流队列

Amazon CloudFront 不遵守我的 S3 网站存储桶的 index.html 规则