我应该如何使用 DataflowBlockOptions.CancellationToken?
Posted
技术标签:
【中文标题】我应该如何使用 DataflowBlockOptions.CancellationToken?【英文标题】:How should I use DataflowBlockOptions.CancellationToken? 【发布时间】:2015-06-27 04:39:12 【问题描述】:我如何使用DataflowBlockOptions.CancellationToken
?
如果我像这样创建BufferBlock
的实例:
var queue = new BufferBlock<int>(new DataflowBlockOptions BoundedCapacity = 5, CancellationToken = _cts.Token );
那么有使用queue
的消费者/生产者方法,我如何使用它的 CancellationToken 来处理取消?
例如在生产者方法中,我如何检查取消令牌 - 我没有找到任何可以访问令牌的属性..
编辑: 生产/消费方法示例:
private static async Task Produce(BufferBlock<int> queue, IEnumerable<int> values)
foreach (var value in values)
await queue.SendAsync(value);
queue.Complete();
private static async Task<IEnumerable<int>> Consume(BufferBlock<int> queue)
var ret = new List<int>();
while (await queue.OutputAvailableAsync())
ret.Add(await queue.ReceiveAsync());
return ret;
调用它的代码:
var queue = new BufferBlock<int>(new DataflowBlockOptions BoundedCapacity = 5, CancellationToken = _cts.Token );
// Start the producer and consumer.
var values = Enumerable.Range(0, 10);
Produce(queue, values);
var consumer = Consume(queue);
// Wait for everything to complete.
await Task.WhenAll(consumer, queue.Completion);
EDIT2:
如果我调用_cts.Cancel()
,Produce
方法不会取消并且不会中断地结束。
【问题讨论】:
how I can use
是什么意思?如果你想取消操作,你可以在_cts
对象上调用.Cancel
(假设它是一个CancellationTokenSource
。当然你可以随时检查是否有一个取消请求IsCancellationRequested
我可以将取消令牌传递给BufferBlock
构造函数(通过DataflowBlockOptions
)。然后,如果我将这个BufferBlock
的实例传递给某个库的produce 方法,该方法就无法从BufferBlock
访问令牌。那么这个有什么用呢?
你能举一个完整的例子吗?在这种情况下,您最终也可以将令牌提供给您要创建的任务see here for an introduction
@CarstenKönig 看到我编辑的问题。我对如何使用DataflowBlockOptions.CancellationToken
中传递的令牌或将其传递给BufferBlock
的构造函数的目的特别感兴趣
【参考方案1】:
如果您想取消生产过程,您应该在其中传递令牌,如下所示:
private static async Task Produce(
BufferBlock<int> queue,
IEnumerable<int> values,
CancellationToken token
)
foreach (var value in values)
await queue.SendAsync(value, token);
Console.WriteLine(value);
queue.Complete();
private static async Task<IEnumerable<int>> Consume(BufferBlock<int> queue)
var ret = new List<int>();
while (await queue.OutputAvailableAsync())
ret.Add(await queue.ReceiveAsync());
return ret;
static void Main(string[] args)
var cts = new CancellationTokenSource();
var queue = new BufferBlock<int>(new DataflowBlockOptions BoundedCapacity = 5, CancellationToken = cts.Token );
// Start the producer and consumer.
var values = Enumerable.Range(0, 100);
Produce(queue, values, cts.Token);
var consumer = Consume(queue);
cts.Cancel();
try
Task.WaitAll(consumer, queue.Completion);
catch (Exception e)
Console.WriteLine(e.ToString());
foreach (var i in consumer.Result)
Console.WriteLine(i);
Console.ReadKey();
【讨论】:
如果您修改Produce
以在循环中输出值,您可以看到取消操作不会阻止生产者发送到队列。我的期望是在await queue.SendAsync(i);
线上它会抛出TaskCancellationException。此外,Produce() 无法从queue
访问 CancellationToken 以检查其状态并自行取消。由此看来,将取消令牌直接传递给 Produce/Consume 方法似乎比依赖 DataflowBlockOptions 更好。不过,我不确定我是否遗漏了什么......
@FilipHurta 也许,SendAsync 没有检查取消,因为发送端和接收端应该有最少的共享资源,或者性能原因【参考方案2】:
通常您使用CancellationToken
选项来控制数据流块的取消,使用外部CancellationTokenSource
。取消阻止(假设它是TransformBlock
)会立即产生以下效果:
-
块停止接受传入消息。调用其
Post
返回false
,这意味着提供的消息被拒绝。
当前存储在块的内部输入缓冲区中的消息会立即被丢弃。这些消息会丢失。它们不会被处理或传播。
如果该块当前没有处理任何消息,以下效果也将立即生效。否则,当所有当前处理的消息处理完成时,它们将跟随:
-
当前存储在此块的输出缓冲区中的所有已处理消息都将被丢弃。最后处理的消息(取消发生时正在处理的消息)将不会传播到下游的链接块。
任何以块为目标的未决异步
SendAsync
操作(在取消发生时正在进行中)将完成并返回 false
(意思是“未接受”)。
表示块的Completion
的Task
转换到Canceled
状态。换句话说,这个任务的IsCanceled
属性变为true
。
您可以通过调用块的Fault
方法直接实现除最后一个效果之外的所有效果,而无需使用CancellationToken
选项。此方法可通过所有块实现的IDataflowBlock
接口访问。你可以这样使用它:
((IDataflowBlock)block).Fault(new OperationCanceledException());
不同之处在于Completion
任务现在将变为Faulted
而不是Canceled
。这种差异可能很重要,也可能不重要,具体取决于具体情况。如果您只是 await
和 Completion
,这是通常使用此属性的方式,在这两种情况下都会抛出 OperationCanceledException
。因此,如果您不需要对 Completion
属性做任何花哨的事情,并且出于某种原因还想避免配置 CancellationToken
,则可以考虑使用此技巧。
更新:在调用Complete
方法后发生取消时的行为,换句话说,当块已处于完成阶段但尚未完成时:
-
如果块是一个处理块,比如
TransformBlock
,以上所有的都会发生同样的事情。该块将很快转换为Canceled
状态。
如果块是非处理块,例如BufferBlock<T>
,则上面列表中的 (3) 将不会发生。 BufferBlock<T>
的输出缓冲区不会被清空,当取消发生在调用 Complete
方法之后。有关此行为的演示,请参阅 this GitHib 问题。请注意Complete
方法不仅可以手动调用,还可以自动调用,前提是该块已作为源块的目标链接,并启用了PropagateCompletion
配置。您可能想查看this 问题,以充分了解此行为的含义。长话短说,取消包含BufferBlock<T>
的数据流管道的所有块并不能保证管道会终止。
旁注:当Complete
和Fault
方法都被调用时,首先调用的是块的最终状态。如果首先调用了Complete
,则该块将以RanToCompletion
状态完成。如果首先调用Fault
,则该块将以Faulted
状态完成。对Complete
d 块进行故障处理仍然会产生影响:它会清空其内部输入缓冲区。
【讨论】:
以上是关于我应该如何使用 DataflowBlockOptions.CancellationToken?的主要内容,如果未能解决你的问题,请参考以下文章
我应该在 __main__.py 中使用啥形式的导入,然后我应该如何运行项目?