如何使用 Parallel.ForEach 正确写入文件?

Posted

技术标签:

【中文标题】如何使用 Parallel.ForEach 正确写入文件?【英文标题】:How to correctly write to a file using Parallel.ForEach? 【发布时间】:2016-05-24 05:44:11 【问题描述】:

我有一个任务,它逐行读取一个大文件,用它做一些逻辑,然后返回一个我需要写入文件的字符串。输出的顺序无关紧要。但是,当我尝试下面的代码时,它会在读取 15-20k 行文件后停止/变得非常慢。

public static Object FileLock = new Object();
...
Parallel.ForEach(System.IO.File.ReadLines(inputFile), (line, _, lineNumber) =>

    var output = MyComplexMethodReturnsAString(line);
    lock (FileLock)
    
        using (var file = System.IO.File.AppendText(outputFile))
        
            file.WriteLine(output);
        
    
);

为什么我的程序运行一段时间后变慢了?是否有更正确的方法来执行此任务?

【问题讨论】:

是否需要输出行的顺序与输入的顺序相对应?如果是这样,Parallel.ForEach 不是正确的工具。 我不确定,但感觉以这种方式使用并行正在创建/恶化 IO 瓶颈,而不是避免它。除非您在这些线路上进行非常昂贵的操作.. var file = System.IO.File.AppendText(outputFile) 可以放在 foreach 之外,因为您正在锁定它。检查这是否会提高性能。 你有一个用于同步的“锁”,以确保只有一个线程可以写入文件。这肯定会减慢速度,因为它会按顺序限制写操作。多个线程会一直等待,直到文件被第一个线程写入。 你的代码相当于说你需要用一把剪刀修剪你的草坪,但是,不要单独做(因为这会花很长时间),你会得到 100 个朋友帮你,但你说他们都得共用一把剪刀。 【参考方案1】:

您实际上是通过让所有线程尝试写入文件来序列化您的查询。相反,您应该计算需要编写的内容,然后在它们结束时编写它们。

var processedLines = File.ReadLines(inputFile).AsParallel()
    .Select(l => MyComplexMethodReturnsAString(l));
File.AppendAllLines(outputFile, processedLines);

如果您需要刷新数据,请打开一个流并启用自动刷新(或手动刷新):

var processedLines = File.ReadLines(inputFile).AsParallel()
    .Select(l => MyComplexMethodReturnsAString(l));
using (var output = File.AppendText(outputFile))

    output.AutoFlush = true;
    foreach (var processedLine in processedLines)
        output.WriteLine(processedLine);

【讨论】:

如果文件真的是文件,我不确定这种方法是否足够,因为第一步需要读取整个文件。 不是在使用File.ReadLines() 时,它会为您提供一个枚举值,该枚举值将在读取文件的行时进行枚举。这与使用 File.ReadAllLines() 形成对比,后者返回一个包含文件所有行的数组。 读入整个文件。【参考方案2】:

这与Parallel.ForEach 的内部负载平衡器的工作方式有关。当它看到你的线程花费大量时间阻塞时,它的理由是它可以通过向问题抛出更多线程来加快处理速度,从而导致更高的并行开销、FileLock 的争用和整体性能下降。

为什么会这样?因为Parallel.ForEach 不适合 IO 工作。

你怎么能解决这个问题? Parallel.ForEach 仅用于 CPU 工作,并在并行循环之外执行所有 IO。

一种快速的解决方法是通过使用接受ParallelOptions 的重载来限制Parallel.ForEach 允许登记的线程数,如下所示:

Parallel.ForEach(
    System.IO.File.ReadLines(inputFile),
    new ParallelOptions  MaxDegreeOfParallelism = Environment.ProcessorCount ,
    (line, _, lineNumber) =>
    
        ...
    

【讨论】:

在“快速解决方法...”之前,我真的很喜欢您的回答。与你之前所说的一切相比,这似乎是一个倒退。也许如果你充实代码对我来说会更有意义。 好奇:我一直认为 Environment.ProcessorCount 是 MaxDegreeOfParallelism 的自然限制。有错吗? @TaW,不,它会远远超出Environment.ProcessorCount。这是一个小提琴,它显示每秒添加大约 1 个线程,直到你终止进程(我在 100 之后放弃了):dotnetfiddle.net/dT1eBM(不用说,你可能不应该在生产服务器上运行它)

以上是关于如何使用 Parallel.ForEach 正确写入文件?的主要内容,如果未能解决你的问题,请参考以下文章

Parallel.ForEach 具有可枚举的 KeyValuePairs?

提高性能异步 Parallel.Foreach

计算 Parallel.ForEach 使用的线程数

在 parallel.ForEach 循环中获取线程 ID

如何限制 Parallel.ForEach?

如何从 Parallel.ForEach 收集返回值?