如何对连接的 TPL 数据流块进行错误处理?
Posted
技术标签:
【中文标题】如何对连接的 TPL 数据流块进行错误处理?【英文标题】:How to do error handling with connected TPL dataflow blocks? 【发布时间】:2021-11-10 18:14:02 【问题描述】:我好像不懂TPL Dataflow错误处理。
假设我有一个我想要处理的项目列表,我为此使用了一个 ActionBlock:
var actionBlock = new ActionBlock<int[]>(async tasks =>
foreach (var task in tasks)
await Task.Delay(1);
if (task > 30)
throw new InvalidOperationException();
Console.WriteLine("0 Completed", task);
, new ExecutionDataflowBlockOptions
BoundedCapacity = 200,
MaxDegreeOfParallelism = 4
);
for (i = 0; i < 10000; i++)
if (!await bufferBlock.SendAsync(i))
break;
actionBlock.Complete();
await actionBlock.Completion;
如果发生错误,块将转换为故障状态,并且 SendAsync(...) 返回 false。我可以停止我的循环并完成它,当我等待完成时会引发异常。到目前为止一切顺利。
当我在两者之间放置一个 BufferBlock 时,它不再起作用:
bufferBlock.LinkTo(actionBlock, new DataflowLinkOptions
PropagateCompletion = true
);
for (i = 0; i < 10000; i++)
if (!await bufferBlock.SendAsync(i, cts.Token))
break;
bufferBlock.Complete();
await actionBlock.Completion;
对 SendAsync() 的调用只是永远“阻塞”,因为 BufferBlock 永远不会转换到故障状态。
我找到的唯一解决方案是:
using (var cts = new CancellationTokenSource())
actionBlock.Completion.ContinueWith(x =>
if (x.Status != TaskStatus.RanToCompletion)
cts.Cancel();
);
var i = 0;
try
for (i = 0; i < 10000; i++)
if (cts.Token.IsCancellationRequested)
break;
if (!await bufferBlock.SendAsync(i, cts.Token))
break;
catch (OperationCanceledException)
bufferBlock.Complete();
await actionBlock.Completion;
因为状态会传播,所以我必须监听网络中最后一个块的状态,当这个块停止时,我必须停止循环。
这是使用 Dataflow 库的预期方式还是有更好的解决方案?
【问题讨论】:
这里是相关问题:TPL Dataflow exception in transform block with bounded capacity。顺便说一句,我对这个问题的回答很好地解决了这个棘手的问题。这个 GitHub 问题显示了问题的另一个方面:No way to cancel completing dataflow blocks。 这不是处理管道错误的正确方法,错误也不应该倒流。块不是函数调用,链接不代表任何类型的所有权。这不是 TPL 数据流问题。 Go 频道也是如此。 【参考方案1】:不允许未处理的异常。块中未处理的异常意味着块以及整个管道最终被破坏并且必须中止。这不是 TPL 数据流错误,而是整个数据流范例的工作方式。异常是为了向调用堆栈发出错误信号。但数据流中没有调用堆栈。
块是通过消息进行通信的独立工作者。链接块和故障块之间没有所有权关系并不意味着任何先前或后续块也应该中止。这就是为什么 PropagateCompletion
默认为 false
的原因。
如果源链接到多个块,则消息可以轻松转到其他块。也可以在运行时更改块之间的链接。
在管道中有两种不同的错误:
block/actor/worker 处理消息时发生的消息错误 使管道无效并且可能需要中止的管道错误如果单个消息出错,则没有理由中止管道。
消息错误
如果在处理消息时出现问题,参与者应该对该消息执行一些操作,然后继续处理下一个。那可能是:
记录错误并继续 向另一个块发送“错误”消息 在整个管道中使用Result<TMessage,TError>
类而不是使用原始消息类型,并将任何错误添加到结果中
重试和恢复策略可以建立在此之上,例如将任何失败的消息转发到“重试”块或死消息块
最简单的方法是捕获异常并记录它们:
var block=new ActionBlock<int[]>(msg=>
try
...
catch(Exception exc)
_logger.LogError(exc);
);
另一种选择是手动发布到死信队列:
var dead=new BufferBlock<(int[] data,Exception error)>();
var block=new ActionBlock<int[]>(msg=>
try
...
catch(Exception exc)
await _dead.SendAsync(msg,exc);
_logger.LogError(exc);
);
更进一步,可以定义一个Result<TMessage,TError>
类来包装结果。下游块可以忽略错误结果。 LinkTo
谓词也可用于重新路由错误消息。我会作弊并将错误硬编码到Exception
。更好的实现会使用不同的类型来表示成功和错误:
record Result<TMessage>(TMessage? Message,Exception ? Error)
public bool HasError=>error!=null;
var block1=new TransformBlock<Result<int[]>,Result<double>>(msg=>
if (msg.HasError)
//Propagate the error message
return new Result<double>(default,msg.Error);
try
var sum=(double)msg.Message.Sum();
if (sum % 5 ==0)
throw new Exception("Why not?");
return new Result(sum,null);
catch(Exception exc)
return new Result(null,exc);
);
var block2=new ActionBlock<Result<double>>(...);
block1.LinkTo(block2);
另一种选择是将错误消息重定向到不同的块:
var errorBlock=new ActionBlock<Result<int[]>>(msg=>
_logger.LogError(msg.Error);
);
block1.LinkTo(errorBlock,msg=>msg.HasError);
block1.LinkTo(block2);
这会将所有错误消息重定向到错误块。所有其他消息都转到block2
管道错误
在某些情况下,错误非常严重,以至于当前块无法恢复,甚至可能必须取消/中止整个管道。 .NET 中的取消是通过 CancellationToken 处理的。所有块都接受 CancellationToken 以允许中止。
没有适用于所有管道的单一中止策略。向前传播取消是常见的,但绝对不是唯一的选择。
在最简单的情况下,
var pipeLineCancellation = new CancellationTokenSource();
var block1=new TransformBlock<Result<int[]>,Result<double>>(msg=>
...
,
new ExecutionDataflowBlockOptions
CancellationToken=pipeLineCancellation.Token
);
如果出现严重错误,块异常处理程序可以请求取消:
//Wrong table name. We can't use the database
catch(SqlException exc) when (exc.Number ==208)
...
pipeLineCancellation.Cancel();
这将中止使用相同 CancellationTokenSource 的所有块。但这并不意味着 所有 块都应该连接到同一个 CancellationTokenSource。
向后流动取消
在 Go 管道中,通常使用 error
通道向前一个块发送取消消息。同样可以在 C# 中使用链接的CancellationTokenSource
s 完成。甚至可以说这比 Go 还要好。
可以使用CreateLinkedTokenSource 创建多个链接的 CancellationTokenSource。通过创建向后链接的源,我们可以为其自己的源进行块信号消除,并将消除流向根。
var cts5=new CancellationTokenSource();
var cts4=CancellationTokenSource.CreateLinkedTokenSource(cts5.Token);
...
var cts1=CancellationTokenSource.CreateLinkedTokenSource(cts2.Token);
...
var block3=new TransformBlock<Result<int[]>,Result<double>>(msg=>
...
catch(SqlException)
cts3.Cancel();
,
new ExecutionDataflowBlockOptions
CancellationToken=cts3.Token
);
这将在不取消下游块的情况下,逐个块地向后发出取消信号。
管道模式
.NET 中的数据流是鲜为人知的瑰宝,因此很难找到好的参考和模式。不过,Go 中的概念相似,因此可以使用 Go Concurrency Patterns: Pipelines and cancellation 中的模式。
TPL 数据流实现了处理循环和完成传播,因此通常只需要提供处理消息的Action
或Func
。其余的模式必须实现,尽管 .NET 比 Go 有一些优势。
done
频道本质上是一个 CancellationTokenSource。
已通过现有块处理扇入、扇出,或者可以使用克隆消息的相对简单的自定义块来处理
CancellationTokenSource
s 可以显式链接。在 Go 中,每个“阶段”(本质上是一个块)都必须将完成/取消传播到其他阶段
所有阶段/块都可以使用一个 CancellationTokenSource。
链接不仅允许更轻松的组合,还允许对管道/网格进行运行时修改。
假设我们想在一段时间后停止处理消息,即使没有错误。所需要的只是创建一个供所有块使用的 CTS:
var pipeLineCancellation = new CancellationTokenSource();
var block1=new TransformBlock<Result<int[]>,Result<double>>(msg=>
...
,
new ExecutionDataflowBlockOptions
CancellationToken=pipeLineCancellation.Token
);
var block2 =.....;
pipeLineCancellation.Cancel();
也许我们只想让管道运行一分钟?轻松使用
var pipeLineCancellation = new CancellationTokenSource(60000);
也有一些缺点,因为 Dataflow 块无法访问“通道”或控制循环
在 Go 中,很容易将data
、error
和 done
通道传递到每个阶段,从而简化错误报告和完成。在 .NET 中,块委托可能必须直接访问其他块或 CTS。
在 Go 中,使用公共状态来积累数据或管理会话/远程连接状态更容易。想象一下控制像 Selenium 这样的屏幕刮板的舞台/块。我们真的不想在每条消息上都重新启动浏览器。
或者我们可能希望使用 SqlBulkCopy 将数据插入数据库。使用 ActionBlock,我们必须为每个批次创建一个新实例,这可能是也可能不是问题。
【讨论】:
以上是关于如何对连接的 TPL 数据流块进行错误处理?的主要内容,如果未能解决你的问题,请参考以下文章