通过 BufferBlock 的背压不起作用。 (C# TPL 数据流)

Posted

技术标签:

【中文标题】通过 BufferBlock 的背压不起作用。 (C# TPL 数据流)【英文标题】:Backpressure via BufferBlock not working. (C# TPL Dataflow) 【发布时间】:2020-09-09 02:01:24 【问题描述】:

典型情况:生产者快,消费者慢,需要让生产者慢下来。 无法按预期工作的示例代码(如下所述):

//  I assumed this block will behave like BlockingCollection, but it doesn't 
var bb = new BufferBlock<int>(new DataflowBlockOptions 
    BoundedCapacity = 3, // looks like this does nothing
);

// fast producer
int dataSource = -1;
var producer = Task.Run(() => 
    while (dataSource < 10) 
        var message = ++dataSource;
        bb.Post(message);
        Console.WriteLine($"Posted: message");
    
    Console.WriteLine("Calling .Complete() on buffer block");
    bb.Complete();
);

// slow consumer
var ab = new ActionBlock<int>(i => 
    Thread.Sleep(500);
    Console.WriteLine($"Received: i");
, new ExecutionDataflowBlockOptions 
    MaxDegreeOfParallelism = 2,
);

bb.LinkTo(ab);

ab.Completion.Wait();

我认为这段代码会起作用,但它不起作用:

BufferBlock bb 是容量为 3 的阻塞队列。一旦达到容量,生产者应该无法 .Post() 到它,直到有一个空槽。 不能那样工作。 bb 似乎很乐意接受任意数量的消息。 producer 是一个快速发布消息的任务。发布所有消息后,对bb.Complete() 的调用应通过管道传播,并在处理完所有消息后发出关闭信号。因此在最后等待ab.Completion.Wait();。 也不起作用。只要调用了.Complete(),动作块ab 就不会再收到任何消息。

可以使用BlockingCollection 来完成,我认为在 TPL 数据流 (TDF) 世界中 BufferBlock 相当于。我想我误解了背压在 TPL 数据流中应该如何工作。

那么问题在哪里?如何运行此管道,不允许缓冲区bb 中的消息超过 3 条,并等待其完成?

PS:我发现了这个要点 (https://gist.github.com/mnadel/df2ec09fe7eae9ba8938),建议在其中维护一个信号量来阻止写入 BufferBlock。我认为这是“内置”的。

接受答案后更新:

接受答案后更新:

如果你正在看这个问题,你需要记住ActionBlock 也有自己的输入缓冲区。

这是一个。然后您还需要意识到,因为所有块都有自己的输入缓冲区,所以您不需要BufferBlock,因为您可能认为它的名字暗示了它。 BufferBlock 更像是用于更复杂架构的实用程序块或平衡加载块。但它不是背压缓冲区。

完成传播需要在链接级别明确定义。

调用.LinkTo() 时需要显式传递new DataflowLinkOptions PropagateCompletion = true 作为第二个参数。

【问题讨论】:

尝试使用SendAsync 代替Post?它阻塞直到有空闲空间 是的,你想要的是await someBlock.SendAsync(item),它将等待块队列中的空闲空间。 Post 在无法插入项目时只会返回 false。 您应该发布您的更新作为答案并接受它,因为它是真正的解决方案(我将发布您所说的内容)。如果你这样做,我会投票赞成。 @ScottChamberlain 感谢您的建议,有点懒惰。顺便说一句,如果您有比我现在发布的更好的示例,请在此处发布,我很乐意更改已接受的答案! 【参考方案1】:

要引入背压,您需要在将项目发送到块时使用SendAsync。这允许您的生产者等待该块为该项目准备好。您正在寻找这样的东西:

class Program

    static async Task Main()
    
        var options = new ExecutionDataflowBlockOptions()
        
            BoundedCapacity = 3
        ;
        var block = new ActionBlock<int>(async i =>
        
            await Task.Delay(100);
            Console.WriteLine(i);
        , options);

        //Producer
        foreach (var i in Enumerable.Range(0, 10))
        
            await block.SendAsync(i);
        

        block.Complete();
        await block.Completion;
    

如果您将其更改为使用Post 并打印Post 的结果,您将看到许多项目未能传递到块:

class Program

    static async Task Main()
    
        var options = new ExecutionDataflowBlockOptions()
        
            BoundedCapacity = 1
        ;
        var block = new ActionBlock<int>(async i =>
        
            await Task.Delay(1000);
            Console.WriteLine(i);
        , options);

        //Producer
        foreach (var i in Enumerable.Range(0, 10))
        
            var result = block.Post(i);
            Console.WriteLine(result);
        

        block.Complete();
        await block.Completion;
    

输出:

True
False
False
False
False
False
False
False
False
False
0

【讨论】:

【参考方案2】:

在 JSteward 的回答的指导下,我想出了以下代码。 它在处理所述项目的同时产生(读取等)新项目,维护一个预读缓冲区。 当“生产者”没有更多项目时,完成信号被发送到链的头部。 该程序还在终止之前等待整个链的完成。

static async Task Main() 

    string Time() => $"DateTime.Now:hh:mm:ss.fff";

    // the buffer is added to the chain just for demonstration purposes
    // the chain would work fine using just the built-in input buffer
    // of the `action` block.
    var buffer = new BufferBlock<int>(new DataflowBlockOptions BoundedCapacity = 3);

    var action = new ActionBlock<int>(async i =>
    
        Console.WriteLine($"[Time()]: Processing: i");
        await Task.Delay(500);
    , new ExecutionDataflowBlockOptions MaxDegreeOfParallelism = 2, BoundedCapacity = 2);

    // it's necessary to set `PropagateCompletion` property
    buffer.LinkTo(action, new DataflowLinkOptions PropagateCompletion = true);

    //Producer
    foreach (var i in Enumerable.Range(0, 10))
    
        Console.WriteLine($"[Time()]: Ready to send: i");
        await buffer.SendAsync(i);
        Console.WriteLine($"[Time()]: Sent: i");
    

    // we call `.Complete()` on the head of the chain and it's propagated forward
    buffer.Complete(); 

    await action.Completion;

【讨论】:

以上是关于通过 BufferBlock 的背压不起作用。 (C# TPL 数据流)的主要内容,如果未能解决你的问题,请参考以下文章

flink的背压问题产生原因和解决方法

如何处理 Kafka Connect Sink 中的背压?

Spring Web-Flux 中的背压机制

一文带你全面了解RxJava的背压策略

Flink 如何处理背压

node.js背压和排放事件。我遇到内存泄漏警告