具有有限容量的转换块中的 TPL 数据流异常

Posted

技术标签:

【中文标题】具有有限容量的转换块中的 TPL 数据流异常【英文标题】:TPL Dataflow exception in transform block with bounded capacity 【发布时间】:2014-03-03 10:18:11 【问题描述】:

我需要构建将处理大量消息的 TPL 数据流管道。因为有很多消息我不能简单地将Post 它们放入BufferBlock 的无限队列中,否则我将面临内存问题。所以我想使用BoundedCapacity = 1 选项来禁用队列并使用MaxDegreeOfParallelism 来使用并行任务处理,因为我的TransformBlocks 可能需要一些时间来处理每条消息。我还使用PropagateCompletion 完成所有操作,但无法沿管道传播。

但是当错误发生在第一条消息之后,我面临错误处理问题:调用await SendAsync 只需将我的应用程序切换到无限等待。

我已将案例简化为示例控制台应用程序:

var data_buffer = new BufferBlock<int>(new DataflowBlockOptions

    BoundedCapacity = 1
);

var process_block = new ActionBlock<int>(x =>

    throw new InvalidOperationException();
, new ExecutionDataflowBlockOptions

    MaxDegreeOfParallelism = 2,
    BoundedCapacity = 1
);

data_buffer.LinkTo(process_block,
    new DataflowLinkOptions  PropagateCompletion = true );

for (var k = 1; k <= 5; k++)

    await data_buffer.SendAsync(k);
    Console.WriteLine("Send: 0", k);


data_buffer.Complete();

await process_block.Completion;

【问题讨论】:

附带说明,使用小于MaxDegreeOfParallelismBoundedCapacity 配置块将降低并行度到容量值。换句话说,如果只允许缓存一项,则该块不能同时处理两项。我相信会发生这种情况,因为在处理完这两个项目后,它应该将两个结果存储在它的输出缓冲区中,并且它没有用于两个结果的可用空间。 可能是,是的。但这至少对我来说并不直观。我认为“缓冲区”意味着溢出的一切。因此,如果我们有 2 个工作人员和 1 个缓冲区容量,它会获得 2 个项目并将它们提供给每个工作人员并“提前”获得 1 个项目。 关于ActionBlock 那么是的,这是有道理的,因为这个块只有一个没有输出的输入队列。但实际上,出于某种原因,即使ActionBlocks 也受相同规则的约束。可能是为了一致性。 【参考方案1】:

这是预期的行为。如果“下游”存在故障,则该错误不会“向后”传播到网格上。网格希望您检测到该故障(例如,通过process_block.Completion)并解决它。

如果您想向后传播错误,您可以在process_block.Completion 上使用faults 如果下游块出现故障,则在faults 上继续。

请注意,这不是唯一可能的解决方案;您可能想要重建网格的该部分或将源链接到替代目标。源块没有出现故障,因此它们可以继续使用已修复的网格进行处理。

【讨论】:

那么,我应该如何等待BufferBlock 再次可用以发送数据?在数据排队结束时我应该等待什么? 有道理。我想要的是,如果我的任何块出现故障,整个管道都会停止并且异常会冒泡到主线程。 @MichaelLogutov:确实如此。它会将任何故障从BufferBlock 传播到ActionBlock。但是数据和完成/错误都不会通过链接向后传播。 @pnewhook:我更喜欢在出现故障时拆除整个网格(我想大多数人都会这样做),但我们没有必须这样做。如果一个块没有传播完成并且出现故障,那么您可以将其与其他块断开链接并放入替换块(同时网格的其余部分仍在运行)。 请记住,如果您不仅等待 process_block 还等待 data_buffer 块,您可能会以死锁结束,因为 data_buffer 块只有在处理现有项目后才会运行完成。而且由于 process_block 的容量有限,它可能永远不会完成。【参考方案2】:

带有PropagateCompletion 配置的LinkTo 方法将源块的完成传播到目标块。因此,如果源块失败,失败将传播到目标块,因此最终两个块都会完成。如果目标块失败,则情况并非如此。在这种情况下,源块将不会收到通知,并将继续接受和处理消息。如果我们在混合中添加BoundedCapacity 配置,源块的内部输出缓冲区很快就会变满,从而阻止它接受更多消息。正如您所发现的,这很容易导致死锁。

为了防止发生死锁,最简单的方法是确保管道的任何块中的错误都会导致其所有组成块尽快及时完成。正如 Stephen Cleary 的 answer 所指出的,其他方法也是可能的,但在大多数情况下,我希望快速失败的方法是理想的行为。令人惊讶的是,这种简单的行为并不那么容易实现。没有现成的内置机制可用于此目的,并且手动实现它很棘手。

从 .NET 6 开始,强制完成作为数据流管道一部分的块的唯一可靠方法是 Fault 块,并通过将其链接到 NullTarget 来丢弃其输出缓冲区。仅故障块,或通过CancellationToken 选项取消它是不够的。在某些情况下,故障或取消的块将无法完成。 Here 是第一种情况的演示(故障且未完成),here 是第二种情况的演示(已取消且未完成)。这两种情况都要求该块先前已标记为已完成,这对于参与数据流管道的所有块可以自动且不确定地发生,并与PropagateCompletion 配置相关联。存在报告此问题行为的 GitHub 问题:No way to cancel completing dataflow blocks。截至撰写本文时,开发人员尚未提供任何反馈。

有了这些知识,我们可以实现一个LinkTo-on-steroids 方法,该方法可以创建如下所示的快速失败管道:

/// <summary>
/// Connects two blocks that belong in a simple, straightforward,
/// one-way dataflow pipeline.
/// Completion is propagated in both directions.
/// Failure of the target block causes purging of all buffered messages
/// in the source block, allowing the timely completion of both blocks.
/// </summary>
/// <remarks>
/// This method should be used only if the two blocks participate in an exclusive
/// producer-consumer relationship.
/// The source block should be the only producer for the target block, and
/// the target block should be the only consumer of the source block.
/// </remarks>
public static async void ConnectTo<TOutput>(this ISourceBlock<TOutput> source,
    ITargetBlock<TOutput> target)

    source.LinkTo(target, new DataflowLinkOptions  PropagateCompletion = true );
    try  await target.Completion.ConfigureAwait(false);  catch  
    if (!target.Completion.IsFaulted) return;
    if (source.Completion.IsCompleted) return;
    source.Fault(new Exception("Pipeline error."));
    source.LinkTo(DataflowBlock.NullTarget<TOutput>()); // Discard all output

使用示例:

var data_buffer = new BufferBlock<int>(new()  BoundedCapacity = 1 );

var process_block = new ActionBlock<int>(
    x => throw new InvalidOperationException(),
    new()  BoundedCapacity = 2, MaxDegreeOfParallelism = 2 );

data_buffer.ConnectTo(process_block); // Instead of LinkTo

foreach (var k in Enumerable.Range(1, 5))
    if (!await data_buffer.SendAsync(k)) break;

data_buffer.Complete();
await process_block.Completion;

您也可以考虑等待管道的所有组成块,之前等待最后一个(或在finally 区域中之后)。这样做的好处是,在发生故障时,在管道的下一次重生之前,您不会冒着泄漏在后台运行的即发即弃操作的风险:

try  await Task.WhenAll(data_buffer.Completion, process_block.Completion);  catch  

您可以忽略await Task.WhenAll 操作可能引发的所有错误,因为等待最后一个块无论如何都会传达大部分与错误相关的信息。您可能只会错过下游块失败后上游块中发生的其他错误。如果需要,您可以尝试观察所有错误,但这会很棘手,因为错误是如何向下游传播的:您可能会多次观察到相同的错误。如果您想认真记录每一个错误,在处理块的 lambdas 中进行记录可能更容易(也更准确),而不是依赖于它们的 Completion 属性。

缺点:上面的ConnectTo 实现一次将故障向后传播一个块。传播不是即时的,因为在任何当前处理的消息的处理完成之前,故障块不会完成。如果管道很长(5-6 个块或更多),并且每个块的工作量很大,这可能是一个问题。这种额外的延迟不仅浪费时间,而且浪费资源,因为这些工作无论如何都会被丢弃。

我在this GitHub 存储库中上传了一个更复杂的ConnectTo 想法版本。它解决了上一段中提到的延迟完成问题:任何块中的失败都会立即传播到所有块。作为奖励,它还会传播管道中的所有错误,作为平面 AggregateException


注意:此答案已从头开始重写。最初的答案 (Revision 4) 包含一些错误的想法,以及 ConnectTo 方法的有缺陷的实现。

【讨论】:

Recently I realized that this pattern is wrong. 这种模式没有任何问题,或者使用 CancellationTokenSource。数据流不是新事物或仅限于 .NET。有一些特定的模式在语言和平台之间重复,即使它们没有包含在命名空间的文档中。即使是 TPL 数据流也不是新的,它已经有将近 10 年的历史了。它永远不需要使用异常作为控制流来传播完成或取消 async void 只是在乞求问题。一个简单的source.Completion.ContinueWith(_=&gt;target.Complete()); 会做同样的工作而不会冒着 ObjectDisposedException 的风险或无缘无故在异步状态机中抛出异常的成本 @PanagiotisKanavos 我认为我已经非常广泛地解释了仅等待管道中的最后一个块有什么问题。 TPL 数据流的时代以及其他语言和平台正在做什么是无关紧要的。错误的模式不会随着年龄的增长而变得正确。关于primitiveContinueWith 方法,我很容易拒绝它,转而支持负责任且无泄漏的async void 方法。 如果我的ConnectTo 方法有问题,我希望这个错误在光天化日之下浮出水面,而不是在黑暗中被吞没。 catch ?? .NullTarget&lt;TOutput&gt;() ?这就是swallowed in the dark的定义 故意使用空的catch 块来消除错误。处理源块和目标块的错误不是ConnectTo 的工作,因为这个方法并不拥有它们。但是它是target.Complete(); 行的所有者。由于无法以合理的方式处理 this 行的失败,因此允许将其作为未处理的异常传播。恕我直言,这是负责任的事情。将 this 行包装在空的 try-catch 中是不负责任的。这就是ContinueWith 本质上在做什么(如果你让返回的Task 泄漏)。

以上是关于具有有限容量的转换块中的 TPL 数据流异常的主要内容,如果未能解决你的问题,请参考以下文章

TPL 数据流块中的异步/同步工作器委托。哪个更好?

多个短期 TPL 数据流与单个长期运行流

具有延迟的 TPL 数据流队列

具有永久任务/线程的 TPL 数据流块

意外行为 - TPL DataFlow BatchBlock 在 TriggerBatch 执行时拒绝项目

用 TPL 数据流中的最新值替换缓冲值