TPL Dataflow:ActionBlock,避免在每次调用其委托时重复运行 using-block(例如用于写入 StreamWriter)

Posted

技术标签:

【中文标题】TPL Dataflow:ActionBlock,避免在每次调用其委托时重复运行 using-block(例如用于写入 StreamWriter)【英文标题】:TPL Dataflow: ActionBlock that avoids repeatedly running a using-block (such as for writing to a StreamWriter) on every invocation of its delegate 【发布时间】:2021-09-22 17:29:58 【问题描述】:

我需要从 IDataReader 读取 1M 行,并同时写入 n 个文本文件。这些文件中的每一个都将是可用列的不同子集;所有 n 个文本文件在完成后将有 1M 行长。

当前计划是一个 TransformManyBlock 来迭代 IDataReader,链接到广播块,链接到 n 个 BufferBlock/ActionBlock 对。

我要避免的是让我的 ActionBlock 委托执行 using (StreamWriter x...) x.WriteLine(); 来打开和关闭每个输出文件一百万次。

我目前的想法是代替ActionBlock,写一个实现ITargetBlock的自定义类。有没有更简单的方法?

编辑 1:讨论对我当前的问题很有价值,但到目前为止的答案都非常关注文件系统行为。为了将来的搜索者的利益,问题的重点是如何在 ActionBlock 委托之外构建某种设置/拆卸。这适用于您通常包装在 using-block 中的任何类型的一次性用品。

编辑 2:根据@Panagiotis Kanavos,解决方案的执行摘要是在定义块之前设置对象,然后在块的 Completion.ContinueWith 中拆除对象

【问题讨论】:

我最好跳过 TPL 数据流并使用 n BlockingCollection/Task.Run 对吗? 你的代码到底做了什么? TPL Datflow 没有任何问题。事实上,由于默认情况下每个块仅使用 1 个任务,您甚至可以使用在该块外部创建的 FileStream。但是,如果您需要编写 1M 行,更好的 解决方案是将它们批处理并一次性编写整个批处理,而不是逐行写出 to n BufferBlock/ActionBlock 为什么?一个 ActionBlock 已经有一个输入 BufferBlock ActionBlock already has an input BufferBlock 是的,但是 BroadcastBlock 的本质是,如果 ActionBlock 落后,则无法保证交付。我将在不关心缓冲区大小的高 RAM 服务器上运行。 @amonroejj 我添加了一个函数,它确实对所有消息使用单个流,但这确实有丢失未写入数据的风险,并且确实会在管道的生命周期内锁定文件. 【参考方案1】:

即使您不必每次都打开流,一次写入一行文件本身也很昂贵。保持文件流打开还有其他问题,因为出于性能原因,文件流总是被缓冲,从FileStream 级别一直到文件系统驱动程序。您必须定期刷新流以确保数据已写入磁盘。

要真正提高性能,您必须对记录进行批处理,例如使用 BatchBlock。一旦你这样做了,打开流的成本就可以忽略不计了。

这些行也应该在最后一刻生成,以避免生成需要被垃圾回收的临时字符串。在 n*1M 记录中,这些分配和垃圾收集的内存和 CPU 开销将非常严重。

在写入之前记录库批处理日志条目以避免这种性能损失。

你可以试试这样的:

var batchBlock=new BatchBlock<Record>(1000);
var writerBlock=new ActionBlock<Record[]>( records => 
   
    //Create or open a file for appending
    using var writer=new StreamWriter(ThePath,true);
    foreach(var record in records)
    
        writer.WriteLine("0 = 1 :2",record.Prop1, record.Prop5, record.Prop2);
    

);

batchBlock.LinkTo(writerBlock,options);

或者,使用异步方法

var batchBlock=new BatchBlock<Record>(1000);
var writerBlock=new ActionBlock<Record[]>(async records => 
   
    //Create or open a file for appending
    await using var writer=new StreamWriter(ThePath,true);
    foreach(var record in records)
    
        await writer.WriteLineAsync("0 = 1 :2",record.Prop1, record.Prop5, record.Prop2);
    

);

batchBlock.LinkTo(writerBlock,options);

您可以调整批处理大小和 StreamWriter 的缓冲区大小以获得最佳性能。

创建一个写入流的实际“块”

可以使用Custom Dataflow block walkthrough 中显示的技术创建自定义块 - 无需创建实际的自定义块,而是创建返回 LinkTo 工作所需的任何内容,在本例中为 ITargetBlock&lt; T&gt;

ITargetBlock<Record> FileExporter(string path)

    var writer=new StreamWriter(path,true);
    var block=new ActionBlock<Record>(async msg=>
        await writer.WriteLineAsync("0 = 1 :2",record.Prop1, record.Prop5, record.Prop2);
    );

    //Close the stream when the block completes
    block.Completion.ContinueWith(_=>write.Close());
    return (ITargetBlock<Record>)target;

...


var exporter1=CreateFileExporter(path1);
previous.LinkTo(exporter,options);

这里的“诀窍”是流在块外创建并保持活动状态,直到块完成。它不是垃圾收集的,因为它被其他代码使用。当块完成时,无论发生什么,我们都需要显式关闭它。 block.Completion.ContinueWith(_=&gt;write.Close()); 将关闭流,无论块是否正常完成。

这与演练中使用的代码相同,用于关闭输出 BufferBlock:

target.Completion.ContinueWith(delegate

   if (queue.Count > 0 && queue.Count < windowSize)
      source.Post(queue.ToArray());
   source.Complete();
);

默认情况下,流是缓冲的,因此调用WriteLine 并不意味着数据实际上会写入磁盘。这意味着我们不知道何时将数据实际写入文件。如果应用程序崩溃,可能会丢失一些数据。

内存、IO 和开销

在很长一段时间内处理 100 万行时,事情会累加起来。可以使用例如File.AppendAllLinesAsync 一次写入一批行,但这会导致分配1M 临时字符串。在每次迭代中,运行时都必须 至少 将这些临时字符串用作批处理的 RAM。在 GC 触发冻结线程之前,RAM 使用量将开始膨胀到数百 MB,然后是 GB。

由于有 100 万行和大量数据,因此很难调试和跟踪管道中的数据。如果出现问题,事情可能会很快非常崩溃。例如,假设有 100 万条消息卡在 一个 块中,因为一条消息被阻止了。

重要的是(出于健全和性能原因)使流水线中的各个组件尽可能简单。

【讨论】:

Writing to a file one line at a time is expensive in itself even when you don't have to open the stream each time. 任何人都可以提供此声明的引用(诚实的问题,而不是恶作剧)? @amonroejj 这将是“视情况而定”的情况。对于初学者来说,“昂贵”是一个相对术语。与某些事物相比,它很昂贵,而与其他事物相比却没有。当然,接下来编写文件将很大程度上取决于实现细节。如果文件在 SSD 上,它的行为将与 HDD 或网络驱动器有很大不同(网络驱动器会因连接而有很大差异)。 @amonroejj 除非您使用缓冲,否则每次流写入都会导致 IO 操作。由于这个原因,FileStream 默认被缓冲。这会减少 IO 操作,但崩溃总是有可能导致数据丢失。长时间锁定文件也会导致其他问题。通过批处理,您可以确保数据在您期望的时候写入并且文件被释放 @amonroejj 当你有 100 万行要写时,事情加起来很快。您也失去了轻松调试和跟踪数据的能力,尤其是对于复杂的管道。这就是我没有使用WriteAllLinesAsync 的原因——这需要生成 1M 临时字符串。【参考方案2】:

通常在使用 TPL 时,我会创建自定义类,这样我就可以创建用于管道中块的私有成员变量和私有方法,但我不会实现 ITargetBlockISourceBlock,而是拥有任何东西我在自定义类中需要的块,然后我将 ITargetBlock 和/或 ISourceBlock 公开为公共属性,以便其他类可以使用源块和目标块将事物链接在一起。

【讨论】:

以上是关于TPL Dataflow:ActionBlock,避免在每次调用其委托时重复运行 using-block(例如用于写入 StreamWriter)的主要内容,如果未能解决你的问题,请参考以下文章

使用 TPL-Dataflow 进行聚合和连接(内、外、左……)?

TPL-Dataflow 是不是适用于高并发应用程序?

TPL Dataflow BufferBlock 线程安全吗?

TPL Dataflow 如何与“全局”数据同步

TPL Dataflow ,完成一个 Block ,重新创建一个 BLock

TPL DataFlow 一对一处理