TPL 数据流 - 非常快的生产者,而不是那么快的消费者 OutOfMemory 异常

Posted

技术标签:

【中文标题】TPL 数据流 - 非常快的生产者,而不是那么快的消费者 OutOfMemory 异常【英文标题】:TPL Dataflow - very fast producer, not so fast consumers OutOfMemory exception 【发布时间】:2017-03-13 10:10:05 【问题描述】:

在将 TPL 数据流移植到我的生产代码之前,我正在试验它。 生产代码是经典的生产者/消费者系统 - 生产者生产消息(与金融领域相关),消费者处理这些消息。

我感兴趣的是,如果在某些时候生产者的生产速度远远超过消费者的处理速度(系统会崩溃,或者会发生什么),更重要的是该怎么做,那么环境将如何保持稳定在那些情况下。

所以为了尝试类似的简单应用程序,我想出了以下内容。

var bufferBlock = new BufferBlock<Item>();

var executiondataflowBlockOptions = new ExecutionDataflowBlockOptions

    MaxDegreeOfParallelism = Environment.ProcessorCount
                        ,
    BoundedCapacity = 100000
;

var dataFlowLinkOptions = new DataflowLinkOptions

    PropagateCompletion = true
;

var actionBlock1 = new ActionBlock<Item>(t => ProcessItem(t),
    executiondataflowBlockOptions);

bufferBlock.LinkTo(actionBlock1, dataFlowLinkOptions);
for (int i = 0; i < int.MaxValue; i++)

    bufferBlock.SendAsync(GenerateItem());


bufferBlock.Complete();
Console.ReadLine();

Item 是一个非常简单的类

internal class Item

    public Item(string itemId)
    
        ItemId = itemId;
    

    public string ItemId  get; 

GenerateItem 只是新闻Item

static Item GenerateItem()

   return new Item(Guid.NewGuid().ToString());

现在,为了模仿没那么快消费者 - 我让ProcessItem 等待100ms

static async Task ProcessItem(Item item)

    await Task.Delay(TimeSpan.FromMilliseconds(100));
    Console.WriteLine($"Processing #item.ItemId item.");

执行此操作会在 20 秒左右导致 OOM 异常。

然后我继续添加更多消费者(更多 ActionBlocks 最多 10 个),这赢得了更多时间,但最终导致相同的 OOM 异常。

我还注意到 GC 承受着巨大的压力(VS 2015 诊断工具显示 GC 几乎一直在运行),所以我为Item 引入了对象池(非常简单,本质上是ConcurrentBag 存储项目) ,但我仍然碰壁(抛出 OOM 异常)。

详细说明内存中的内容,以及内存不足的原因。

最大尺寸有SingleProducerSingleConsumerQueue+Segment&lt;TplDataFlow.Item&gt;ConcurrentQueue+Segment&lt;TplDataFlow.Item&gt;类型的对象 我看到BufferBlock 的 InputBuffer 中充满了Items (Count=14,562,296) 由于我为ActionBlock(s) 设置了BoundedCapacity,它们的输入缓冲区也接近配置的数字 (InputCount=99,996)

为了确保较慢的生产者可以让消费者跟上,我让生产者在迭代之间休眠:

for (int i = 0; i < int.MaxValue; i++)

    Thread.Sleep(TimeSpan.FromMilliseconds(50));
    bufferBlock.SendAsync(GenerateItem());

它工作正常 - 没有抛出异常,内存使用率一直很低,我看不到任何 GC 压力。

所以我有几个问题

    在尝试使用 TPL Dataflow 构建块重现非常快速的生产者/慢速消费者场景时,我做错了什么吗 有什么方法可以使这项工作不会因 OOM 异常而失败。 有关如何在 TPL 数据流上下文中处理此类场景(非常快的生产者/慢速消费者)的最佳实践的任何 cmets/链接。 我对这个问题的理解是 - 由于消费者跟不上,BufferBlock 的内部缓冲区很快就会被消息填满,并且会一直等待消息,直到一些消费者回来询问下一条消息结果应用程序内存不足(由于BufferBlock 的内部缓冲区已满) - 你同意吗?

我正在使用Microsoft.Tpl.Dataflow 包-版本 4.5.24。 .NET 4.5 (C# 6)。进程是 32 位的。

【问题讨论】:

【参考方案1】:

您已经很好地发现了问题:BufferBlock 正在填充其输入缓冲区,直到它达到 OOM。

要解决这个问题,您还应该在缓冲区块中添加一个BoundedCapacity 选项。这将自动为您限制生产者(不需要您的生产者中的Thread.Sleep)。

【讨论】:

感谢您的确认!你能评论一下 BufferBlock 的BoundedCapacity 的行为吗?特别是一旦内部缓冲区已满,传入的消息会发生什么?他们会被丢弃吗? (目前似乎是这样)。您是否可能在您的书 Stephen 中对此进行了更详细的介绍? @Michael:消息永远不会被丢弃。如果您将Post 消息发送到一个完整的块,它将返回false(表示该消息未被接受)。如果您将消息await SendAsync 发送到一个完整的块,它将(异步)等待有空间然后发布消息。见this answer。【参考方案2】:

以下代码可能存在严重问题:

for (int i = 0; i < int.MaxValue; i++)

    bufferBlock.SendAsync(GenerateItem()); // Don't do this!

SendAsync 方法返回一个Task,它在内存方面比您发送到块的实际项目要重得多。在具体示例中,返回的任务总是完成,因为BufferBlock 的容量是无限的,因此任务的内存占用几乎为零(始终返回相同的缓存Task&lt;bool&gt; 实例)。但是在用一个小的BoundedCapacity 值配置块之后,事情很快就会变得有趣(以一种不愉快的方式)。对SendAsync 的每次调用将很快开始返回一个不完整的Task,每次都不同,每个任务的内存占用大约为200 字节(如果还使用了CancellationToken 参数,则为300 字节)。这显然不会很好地扩展。

解决方案是按照预期的方式使用SendAsync。这意味着应该等待:

for (int i = 0; i < int.MaxValue; i++)

    await bufferBlock.SendAsync(GenerateItem()); // It's OK now

这样,生产者将被异步阻塞,直到块内有可用空间来容纳发送的项目。希望这就是你想要的。否则,如果不想阻塞生产者,就不要使用异步的SendAsync方法,而是使用同步的Post方法:

for (int i = 0; i < int.MaxValue; i++)

    var item = GenerateItem();
    while (true)
    
        bool accepted = bufferBlock.Post(item); // Synchronous call
        if (accepted) break; // Break the inner loop
        if (bufferBlock.Completion.IsCompleted) return; // Break both loops

        // Here do other things for a while, before retrying to post the item
    

您也可以使用下杠杆OfferMessage 方法(而不是PostSendAsync):

for (int i = 0; i < int.MaxValue; i++)

    var item = GenerateItem();
    while (true)
    
        var offerResult = ((ITargetBlock<Item>)bufferBlock).OfferMessage(
            new DataflowMessageHeader(1L), item, null, false);
        if (offerResult == DataflowMessageStatus.Accepted) break;
        if (offerResult == DataflowMessageStatus.DecliningPermanently) return;

        // Here do other things for a while, before retrying to offer the item
    

幻数1L 是在TPL 数据流source code 内部声明的值,表示:

一个众所周知的消息 ID,用于只发送一条消息的代码,或者确切的消息 ID 不重要。

【讨论】:

以上是关于TPL 数据流 - 非常快的生产者,而不是那么快的消费者 OutOfMemory 异常的主要内容,如果未能解决你的问题,请参考以下文章

以慢为快的学习思想

非常快的文本文件处理 (C++)

悬停时的 jQuery 动画 |非常快的传球,停止动画

如何设计一个查询速度非常快的搜索引擎?

数据结构之三-红黑树

Java:是不是存在磁盘与内存一样快的情况?