TPL 数据流 - 非常快的生产者,而不是那么快的消费者 OutOfMemory 异常
Posted
技术标签:
【中文标题】TPL 数据流 - 非常快的生产者,而不是那么快的消费者 OutOfMemory 异常【英文标题】:TPL Dataflow - very fast producer, not so fast consumers OutOfMemory exception 【发布时间】:2017-03-13 10:10:05 【问题描述】:在将 TPL 数据流移植到我的生产代码之前,我正在试验它。 生产代码是经典的生产者/消费者系统 - 生产者生产消息(与金融领域相关),消费者处理这些消息。
我感兴趣的是,如果在某些时候生产者的生产速度远远超过消费者的处理速度(系统会崩溃,或者会发生什么),更重要的是该怎么做,那么环境将如何保持稳定在那些情况下。
所以为了尝试类似的简单应用程序,我想出了以下内容。
var bufferBlock = new BufferBlock<Item>();
var executiondataflowBlockOptions = new ExecutionDataflowBlockOptions
MaxDegreeOfParallelism = Environment.ProcessorCount
,
BoundedCapacity = 100000
;
var dataFlowLinkOptions = new DataflowLinkOptions
PropagateCompletion = true
;
var actionBlock1 = new ActionBlock<Item>(t => ProcessItem(t),
executiondataflowBlockOptions);
bufferBlock.LinkTo(actionBlock1, dataFlowLinkOptions);
for (int i = 0; i < int.MaxValue; i++)
bufferBlock.SendAsync(GenerateItem());
bufferBlock.Complete();
Console.ReadLine();
Item
是一个非常简单的类
internal class Item
public Item(string itemId)
ItemId = itemId;
public string ItemId get;
GenerateItem
只是新闻Item
static Item GenerateItem()
return new Item(Guid.NewGuid().ToString());
现在,为了模仿没那么快消费者 - 我让ProcessItem
等待100ms
。
static async Task ProcessItem(Item item)
await Task.Delay(TimeSpan.FromMilliseconds(100));
Console.WriteLine($"Processing #item.ItemId item.");
执行此操作会在 20 秒左右导致 OOM 异常。
然后我继续添加更多消费者(更多 ActionBlocks 最多 10 个),这赢得了更多时间,但最终导致相同的 OOM 异常。
我还注意到 GC 承受着巨大的压力(VS 2015 诊断工具显示 GC 几乎一直在运行),所以我为Item
引入了对象池(非常简单,本质上是ConcurrentBag
存储项目) ,但我仍然碰壁(抛出 OOM 异常)。
详细说明内存中的内容,以及内存不足的原因。
最大尺寸有SingleProducerSingleConsumerQueue+Segment<TplDataFlow.Item>
和ConcurrentQueue+Segment<TplDataFlow.Item>
类型的对象
我看到BufferBlock
的 InputBuffer 中充满了Item
s (Count=14,562,296)
由于我为ActionBlock
(s) 设置了BoundedCapacity
,它们的输入缓冲区也接近配置的数字 (InputCount=99,996)
为了确保较慢的生产者可以让消费者跟上,我让生产者在迭代之间休眠:
for (int i = 0; i < int.MaxValue; i++)
Thread.Sleep(TimeSpan.FromMilliseconds(50));
bufferBlock.SendAsync(GenerateItem());
它工作正常 - 没有抛出异常,内存使用率一直很低,我看不到任何 GC 压力。
所以我有几个问题
-
在尝试使用 TPL Dataflow 构建块重现非常快速的生产者/慢速消费者场景时,我做错了什么吗
有什么方法可以使这项工作不会因 OOM 异常而失败。
有关如何在 TPL 数据流上下文中处理此类场景(非常快的生产者/慢速消费者)的最佳实践的任何 cmets/链接。
我对这个问题的理解是 - 由于消费者跟不上,
BufferBlock
的内部缓冲区很快就会被消息填满,并且会一直等待消息,直到一些消费者回来询问下一条消息结果应用程序内存不足(由于BufferBlock
的内部缓冲区已满) - 你同意吗?
我正在使用Microsoft.Tpl.Dataflow
包-版本 4.5.24。
.NET 4.5 (C# 6)。进程是 32 位的。
【问题讨论】:
【参考方案1】:您已经很好地发现了问题:BufferBlock
正在填充其输入缓冲区,直到它达到 OOM。
要解决这个问题,您还应该在缓冲区块中添加一个BoundedCapacity
选项。这将自动为您限制生产者(不需要您的生产者中的Thread.Sleep
)。
【讨论】:
感谢您的确认!你能评论一下 BufferBlock 的BoundedCapacity
的行为吗?特别是一旦内部缓冲区已满,传入的消息会发生什么?他们会被丢弃吗? (目前似乎是这样)。您是否可能在您的书 Stephen 中对此进行了更详细的介绍?
@Michael:消息永远不会被丢弃。如果您将Post
消息发送到一个完整的块,它将返回false
(表示该消息未被接受)。如果您将消息await SendAsync
发送到一个完整的块,它将(异步)等待有空间然后发布消息。见this answer。【参考方案2】:
以下代码可能存在严重问题:
for (int i = 0; i < int.MaxValue; i++)
bufferBlock.SendAsync(GenerateItem()); // Don't do this!
SendAsync
方法返回一个Task
,它在内存方面比您发送到块的实际项目要重得多。在具体示例中,返回的任务总是完成,因为BufferBlock
的容量是无限的,因此任务的内存占用几乎为零(始终返回相同的缓存Task<bool>
实例)。但是在用一个小的BoundedCapacity
值配置块之后,事情很快就会变得有趣(以一种不愉快的方式)。对SendAsync
的每次调用将很快开始返回一个不完整的Task
,每次都不同,每个任务的内存占用大约为200 字节(如果还使用了CancellationToken
参数,则为300 字节)。这显然不会很好地扩展。
解决方案是按照预期的方式使用SendAsync
。这意味着应该等待:
for (int i = 0; i < int.MaxValue; i++)
await bufferBlock.SendAsync(GenerateItem()); // It's OK now
这样,生产者将被异步阻塞,直到块内有可用空间来容纳发送的项目。希望这就是你想要的。否则,如果不想阻塞生产者,就不要使用异步的SendAsync
方法,而是使用同步的Post
方法:
for (int i = 0; i < int.MaxValue; i++)
var item = GenerateItem();
while (true)
bool accepted = bufferBlock.Post(item); // Synchronous call
if (accepted) break; // Break the inner loop
if (bufferBlock.Completion.IsCompleted) return; // Break both loops
// Here do other things for a while, before retrying to post the item
您也可以使用下杠杆OfferMessage
方法(而不是Post
或SendAsync
):
for (int i = 0; i < int.MaxValue; i++)
var item = GenerateItem();
while (true)
var offerResult = ((ITargetBlock<Item>)bufferBlock).OfferMessage(
new DataflowMessageHeader(1L), item, null, false);
if (offerResult == DataflowMessageStatus.Accepted) break;
if (offerResult == DataflowMessageStatus.DecliningPermanently) return;
// Here do other things for a while, before retrying to offer the item
幻数1L
是在TPL 数据流source code 内部声明的值,表示:
一个众所周知的消息 ID,用于只发送一条消息的代码,或者确切的消息 ID 不重要。
【讨论】:
以上是关于TPL 数据流 - 非常快的生产者,而不是那么快的消费者 OutOfMemory 异常的主要内容,如果未能解决你的问题,请参考以下文章