我应该如何使用 DataflowBlockOptions.CancellationToken?

Posted

技术标签:

【中文标题】我应该如何使用 DataflowBlockOptions.CancellationToken?【英文标题】:How should I use DataflowBlockOptions.CancellationToken? 【发布时间】:2015-06-27 04:39:12 【问题描述】:

我如何使用DataflowBlockOptions.CancellationToken

如果我像这样创建BufferBlock 的实例:

var queue = new BufferBlock<int>(new DataflowBlockOptions BoundedCapacity = 5, CancellationToken = _cts.Token );

那么有使用queue 的消费者/生产者方法,我如何使用它的 CancellationToken 来处理取消?

例如在生产者方法中,我如何检查取消令牌 - 我没有找到任何可以访问令牌的属性..

编辑: 生产/消费方法示例:

private static async Task Produce(BufferBlock<int> queue, IEnumerable<int> values)

    foreach (var value in values)
    
        await queue.SendAsync(value);
    

    queue.Complete();


private static async Task<IEnumerable<int>> Consume(BufferBlock<int> queue)

    var ret = new List<int>();
    while (await queue.OutputAvailableAsync())
    
        ret.Add(await queue.ReceiveAsync());
    

    return ret;

调用它的代码:

var queue = new BufferBlock<int>(new DataflowBlockOptions  BoundedCapacity = 5, CancellationToken = _cts.Token );

// Start the producer and consumer.
var values = Enumerable.Range(0, 10);
Produce(queue, values);
var consumer = Consume(queue);

// Wait for everything to complete.
await Task.WhenAll(consumer, queue.Completion);

EDIT2:

如果我调用_cts.Cancel()Produce 方法不会取消并且不会中断地结束。

【问题讨论】:

how I can use 是什么意思?如果你想取消操作,你可以在_cts对象上调用.Cancel(假设它是一个CancellationTokenSource。当然你可以随时检查是否有一个取消请求IsCancellationRequested 我可以将取消令牌传递给BufferBlock构造函数(通过DataflowBlockOptions)。然后,如果我将这个BufferBlock 的实例传递给某个库的produce 方法,该方法就无法从BufferBlock 访问令牌。那么这个有什么用呢? 你能举一个完整的例子吗?在这种情况下,您最终也可以将令牌提供给您要创建的任务see here for an introduction @CarstenKönig 看到我编辑的问题。我对如何使用DataflowBlockOptions.CancellationToken 中传递的令牌或将其传递给BufferBlock 的构造函数的目的特别感兴趣 【参考方案1】:

如果您想取消生产过程,您应该在其中传递令牌,如下所示:

    private static async Task Produce(
        BufferBlock<int> queue, 
        IEnumerable<int> values,
        CancellationToken token
        )
    
        foreach (var value in values)
        
            await queue.SendAsync(value, token);
            Console.WriteLine(value);
        

        queue.Complete();
    

    private static async Task<IEnumerable<int>> Consume(BufferBlock<int> queue)
    
        var ret = new List<int>();
        while (await queue.OutputAvailableAsync())
        
            ret.Add(await queue.ReceiveAsync());
        

        return ret;
    

    static void Main(string[] args)
    
        var cts = new CancellationTokenSource();

        var queue = new BufferBlock<int>(new DataflowBlockOptions  BoundedCapacity = 5, CancellationToken = cts.Token );

        // Start the producer and consumer.
        var values = Enumerable.Range(0, 100);
        Produce(queue, values, cts.Token);
        var consumer = Consume(queue);

        cts.Cancel();

        try
        
            Task.WaitAll(consumer, queue.Completion);
        
        catch (Exception e)
        
            Console.WriteLine(e.ToString());
        

        foreach (var i in consumer.Result)
        
            Console.WriteLine(i);
        

        Console.ReadKey();

【讨论】:

如果您修改Produce 以在循环中输出值,您可以看到取消操作不会阻止生产者发送到队列。我的期望是在await queue.SendAsync(i); 线上它会抛出TaskCancellationException。此外,Produce() 无法从queue 访问 CancellationToken 以检查其状态并自行取消。由此看来,将取消令牌直接传递给 Produce/Consume 方法似乎比依赖 DataflowBlockOptions 更好。不过,我不确定我是否遗漏了什么...... @FilipHurta 也许,SendAsync 没有检查取消,因为发送端和接收端应该有最少的共享资源,或者性能原因【参考方案2】:

通常您使用CancellationToken 选项来控制数据流块的取消,使用外部CancellationTokenSource。取消阻止(假设它是TransformBlock)会立即产生以下效果:

    块停止接受传入消息。调用其Post 返回false,这意味着提供的消息被拒绝。 当前存储在块的内部输入缓冲区中的消息会立即被丢弃。这些消息会丢失。它们不会被处理或传播。

如果该块当前没有处理任何消息,以下效果也将立即生效。否则,当所有当前处理的消息处理完成时,它们将跟随:

    当前存储在此块的输出缓冲区中的所有已处理消息都将被丢弃。最后处理的消息(取消发生时正在处理的消息)将不会传播到下游的链接块。 任何以块为目标的未决异步 SendAsync 操作(在取消发生时正在进行中)将完成并返回 false(意思是“未接受”)。 表示块的CompletionTask 转换到Canceled 状态。换句话说,这个任务的IsCanceled 属性变为true

您可以通过调用块的Fault 方法直接实现除最后一个效果之外的所有效果,而无需使用CancellationToken 选项。此方法可通过所有块实现的IDataflowBlock 接口访问。你可以这样使用它:

((IDataflowBlock)block).Fault(new OperationCanceledException());

不同之处在于Completion 任务现在将变为Faulted 而不是Canceled。这种差异可能很重要,也可能不重要,具体取决于具体情况。如果您只是 awaitCompletion,这是通常使用此属性的方式,在这两种情况下都会抛出 OperationCanceledException。因此,如果您不需要对 Completion 属性做任何花哨的事情,并且出于某种原因还想避免配置 CancellationToken,则可以考虑使用此技巧。


更新:在调用Complete 方法后发生取消时的行为,换句话说,当块已处于完成阶段但尚未完成时:

    如果块是一个处理块,比如TransformBlock,以上所有的都会发生同样的事情。该块将很快转换为Canceled 状态。 如果块是非处理块,例如BufferBlock&lt;T&gt;,则上面列表中的 (3) 将不会发生。 BufferBlock&lt;T&gt; 的输出缓冲区不会被清空,当取消发生在调用 Complete 方法之后。有关此行为的演示,请参阅 this GitHib 问题。请注意Complete 方法不仅可以手动调用,还可以自动调用,前提是该块已作为源块的目标链接,并启用了PropagateCompletion 配置。您可能想查看this 问题,以充分了解此行为的含义。长话短说,取消包含BufferBlock&lt;T&gt; 的数据流管道的所有块并不能保证管道会终止。

旁注:当CompleteFault 方法都被调用时,首先调用的是块的最终状态。如果首先调用了Complete,则该块将以RanToCompletion 状态完成。如果首先调用Fault,则该块将以Faulted 状态完成。对Completed 块进行故障处理仍然会产生影响:它会清空其内部输入缓冲区。

【讨论】:

以上是关于我应该如何使用 DataflowBlockOptions.CancellationToken?的主要内容,如果未能解决你的问题,请参考以下文章

我应该在 __main__.py 中使用啥形式的导入,然后我应该如何运行项目?

我应该如何使用 DataflowBlockOptions.CancellationToken?

我应该如何使用 CMake 构建 MPI C++ 程序?

我应该如何在 MySQL 中使用 NOT IN 语句

如果我从多个线程中使用 JSch,我应该如何使用它

我应该如何使用 Optional 类型提示?