在 C# 中,如何处理具有多个线程/任务但有条件的大型文本文件?
Posted
技术标签:
【中文标题】在 C# 中,如何处理具有多个线程/任务但有条件的大型文本文件?【英文标题】:In C#, how do I process a large text file with multiple threads/tasks, but with conditions? 【发布时间】:2021-07-28 05:45:44 【问题描述】:我正在用 C# 编写一个文件处理程序。我有一个巨大的文本文件,有 5 列数据,每列由一个条(|)分隔。每行的第一列是一个包含人名的列,每个人都有一个唯一的名字。
它是一个非常大的文本文件,所以我想使用多个任务同时处理它。但我希望 SAME 任务处理具有相同名称的每一行,而不是不同的任务。例如,如果(部分)我的文件内容为:
Jason|BMW|354|23|1/1/2000|1:03
Jason|BMW|354|23|1/1/2000|1:03
Jason|BMW|354|23|1/1/2000|1:03
Jason|Acura|354|23|1/1/2000|1:03
Jason|BMW|354|23|1/1/2000|1:03
Jason|BMW|354|23|1/1/2000|1:03
Jason|Hyundai|392|17|1/1/2000|1:06
Mike|Infiniti|335|18|8/24/2005|7:11
Mike|Infiniti|335|18|8/24/2005|7:11
Mike|Infiniti|335|18|8/24/2005|7:11
Mike|Dodge|335|18|8/24/2005|7:18
Mike|Infiniti|335|18|8/24/2005|7:11
Mike|Infiniti|335|18|8/24/2005|7:14
然后我想要一个任务处理 ALL Jason 行,另一个任务处理 ALL Mike 行。我不希望第一个任务处理任何 Mike 行,相反我不希望第二个任务处理任何 Jason 行。本质上,我怎样才能使某个名称的所有行都由 SAME 任务处理?另外,我怎么知道所有行的所有处理都已完成?我一直在绞尽脑汁,想不出解决办法。
【问题讨论】:
只需启动几个进程。将每个“名称”行发送到相应的进程。顺便说一句,您在这里需要多个进程的可能性极小。文件有多大? 文件是否按名称排序? @AkshayNatu 什么是“巨大”?千兆字节?太字节? @tymtam 基本上,认为线程是昂贵的,因为它们是,而且它们是有限的。如果每个唯一的人名都有一个专用线程,那么您就会用完。通过提出一种将名称映射到更小、有限的线程子集的方法,您可以避免过多的上下文切换和线程的过度使用。哈希或映射到更少线程的其他算法很可能会导致更好的吞吐量。散列是一种方法。您还可以使用“桶”或“分区”名称。例如,以“A-F”开头的每个人都去这里,而以“G-K”开头的每个人都去那里,依此类推。 @Kit 全部清除。您的解释让我意识到假设文件中没有很多不同的名称(这个假设很可能是错误的)。谢谢! 【参考方案1】:我会选择一个并发队列的并发字典,按名称键入。
在主线程(称为阅读器)中,逐行循环,将行排入适当的并发队列(称为工作队列),创建新的工作队列并根据需要创建新的工作队列和专用任务遇到名字。
它看起来像这样(注意:这是半伪代码和半真实代码,没有错误检查,因此将其视为解决方案的基础,而不是 解决方案) .
class FileProcessor
private ConcurrentDictionary<string, Worker> workers = new ConcurrentDictionary<string, Worker>();
class Worker
public Worker() => Task = Task.Run(Process);
private void Process()
foreach (var row in Queue.GetConsumingEnumerable())
if (row.Length == 0) break;
ProcessRow(row);
private void ProcessRow(string[] row)
// your implementation here
public Task Task get;
public BlockingCollection<string[]> Queue get; = new BlockingCollection<string[]>(new ConcurrentQueue<string[]>());
void ProcessFile(string fileName)
foreach (var line in GetLinesOfFile(fileName))
var row = line.Split('|');
var name = row[0];
// create worker as needed
var worker = workers.GetOrAdd(name, x => new Worker());
// add a row for the worker to work on
worker.Queue.Add(row);
// send an empty array to each worker to signal end of input
foreach (var worker in workers.Values)
worker.Queue.Add(new string[0]);
// now wait for all workers to be done
Task.WaitAll(workers.Values.Select(x => x.Task).ToArray());
private static IEnumerable<string> GetLinesOfFile(string fileName)
// this helps limit memory consumption by not loading
// the whole file at once
return File.ReadLines(fileName);
我建议您的阅读器线程流式传输文件而不是读取整个文件;你说文件很大,所以流媒体对内存很友好)。那个阅读器线程是 I/O 绑定的,所以如果你可以异步/等待它,那会比我简单的Process()
做一个foreach
没有等待更好。
这种方法的特点:
每个人的名字的专用任务 使用sentinel 值表示输入结束 使用Task.WaitAll
重新加入主线程
假设任务受 CPU 限制。如果它们受 I/O 限制,请考虑改用 async/await 和 Task.WhenAll
文件通过File.ReadLines()流式传输到内存中
名称不需要排序,因为要入队的队列是按名称按需选择的
改进
为了完整起见,上面的方法有点幼稚,可以通过...阅读所有的cmets和答案来改进;用户 Zoulias 和 Mercer 尤其有优点。我们可以使用
来完善这种方法 将此调整到 TPL 频道并使用 CompleteAdding。这些不仅是更好的抽象,而且更高效(abstraction 和 efficient 经常会发生冲突,但在这种情况下并非如此)。 减少name-to-thread或name-to-task的专用性,在大量名称的情况下会耗尽资源,而是将名称映射到每个桶/分区都有一个专用任务的桶或分区/线程。例如,对于第二点,您可以有
// create worker as needed
var worker = workers.GetOrAdd(GetPartitionKey(name), x => new Worker());
GetPartitionKey()
的实现方式类似于
private string GetPartitionKey(string name) =>
name[0] switch
>= 'a' and <= 'f' => "A thru F bucket",
>= 'A' and <= 'F' => "A thru F bucket",
>= 'g' and <= 'k' => "G thru K bucket",
>= 'G' and <= 'K' => "G thru K bucket",
_ => "everything else bucket"
或任何您想用作分区选择器的算法。
【讨论】:
File.ReadLines
将一次流式传输一行,不管它的价值。可以用它代替GetLinesOfFile
@RufusL 谢谢!不知道我的头顶上。更新了答案。
非常感谢!我会试试这个,让你知道。
我应该说“它允许您在返回整个集合之前开始枚举集合”更准确:)
嗯,是的。我说它逐行流动可能是错误的。为了准确起见,我将再次更新我的答案。流的缓冲可能比一行更细!【参考方案2】:
一个想法是实现producer-consumer 模式,一个生产者逐行读取文件,多个消费者处理这些行,每个名称一个消费者。由于唯一名称的数量可能很大,为每个消费者指定一个Thread
是不切实际的,因此消费者应该异步处理数据。每个消费者都应该有自己的私有队列,其中包含要处理的数据。当前在 .NET 中可用的最有效的异步队列是 Channel<T>
类,将其用作构建块将是一个好主意,但我会建议更高级别的东西:ActionBlock<T>
来自 TPL Dataflow图书馆。该组件结合了处理器和队列,支持异步,并且高度可配置。因此,这将是一个简洁、易读且希望非常有效的解决方案:
var processors = new Dictionary<string, ActionBlock<string>>();
foreach (var line in File.ReadLines(filePath))
string name = ExtractName(line); // Reads the first part of the line
if (!processors.TryGetValue(name, out ActionBlock<string> processor))
processor = CreateProcessor(name);
processors.Add(name, processor);
var accepted = processor.Post(line);
if (!accepted) break; // The processor has failed
// Signal that no more lines will be sent to the processors
foreach (var processor in processors.Values) processor.Complete();
// Aggregate the completion of all processors
Task allCompletions = Task.WhenAll(processors.Values.Select(p => p.Completion));
// Wait for the completion of all processors, and allow errors to propagate
allCompletions.Wait(); // or await allCompletions;
static ActionBlock<string> CreateProcessor(string name)
return new ActionBlock<string>((string line) =>
// Process the line
, new ExecutionDataflowBlockOptions()
// Configure the options if the defaults are not optimal
);
【讨论】:
这也是一个很好的答案。 TPL 数据流更现代一点,不必使用哨兵也不错。 @Kit TPL 数据流现在至少是10 years old,所以它不完全是尖端技术。 Channels 是镇上的新人! 嗯。几十年……抽象……哈哈【参考方案3】:我怎样才能使某个名称的所有行都由 SAME 任务处理?
可以使用各种TaskCreationOptions
创建System.Threading.Task
,这些TaskCreationOptions
规定了线程和资源在其生命周期内的管理方式和时间。对于消耗大量数据并进一步将数据消耗隔离到特定线程的操作 - 您可能需要考虑使用选项 TaskCreationOptions.LongRunning
which may provide a hint to the task scheduler that an additional thread might be required for the task so that it does not block the forward progress of other threads or work items on the local thread-pool queue. 创建负责各个名称的任务
对于实际的方法,我建议启动各种“Worker”线程,每个线程都有自己的Task
和主task
(读取文件或解析 JSON 数据的那个)之间进行通信的方式这两个需要完成更多的工作。
考虑使用线程安全的集合,例如 ConcurrentQueue<T>
或 other various collections,它们可以帮助您在线程之间流式传输数据以安全使用。
这是您可能要考虑的结构的一个非常有限的示例:
void Worker(ConcurrentQueue<string> Queue, CancellationToken Token)
// keep the worker in a
while (Token.IsCancellationRequested is false)
// check to see if the queue has stuff, and consume it
if (Queue.TryDequeue(out string line))
Console.WriteLine($"Consumed Line line Thread.CurrentThread.ManagedThreadId");
// yield the thread incase other threads have work to do
Thread.Sleep(10);
Console.WriteLine("Finished Work");
// data could be a reader, list, array anything really
IEnumerable<string> Data()
yield return "JASON";
yield return "Mike";
yield return "JASON";
yield return "Mike";
void Reader()
// create some collections to stream the data to other tasks
ConcurrentQueue<string> Jason = new();
ConcurrentQueue<string> Mike = new();
// make sure we have a way to cancel the workers if we need to
CancellationTokenSource tokenSource = new();
// start some worker tasks that will consume the data
Task[] workers =
new Task(()=> Worker(Jason, tokenSource.Token), TaskCreationOptions.LongRunning),
new Task(()=> Worker(Mike, tokenSource.Token), TaskCreationOptions.LongRunning)
;
for (int i = 0; i < workers.Length; i++)
workers[i].Start();
// iterate the data and send it off to the queues for consumption
foreach (string line in Data())
switch (line)
case "JASON":
Console.WriteLine($"Sent line to JASON Thread.CurrentThread.ManagedThreadId");
Jason.Enqueue(line);
break;
case "Mike":
Console.WriteLine($"Sent line to Mike Thread.CurrentThread.ManagedThreadId");
Mike.Enqueue(line);
break;
default:
Console.WriteLine($"Disposed unknown line Thread.CurrentThread.ManagedThreadId");
break;
// make sure that worker threads are cancelled if parent task has been cancelled
try
// wait for workers to finish by checking collections
do
Thread.Sleep(10);
while (Jason.IsEmpty is false && Mike.IsEmpty is false);
finally
// cancel the worker threads, if they havent already
tokenSource.Cancel();
// make sure we have a way to cancel the reader if we need to
CancellationTokenSource tokenSource = new();
// start the reader thread
Task[] tasks = Task.Run(Reader, tokenSource.Token) ;
Console.WriteLine("Starting Reader");
Task.WaitAll(tasks);
Console.WriteLine("Finished Reader");
// cleanup the tasks if they are still running some how
tokenSource?.Cancel();
// dispose of IDisposable Object
tokenSource?.Dispose();
Console.ReadLine();
【讨论】:
OP 声称这是一个非常大的文件。用户可能不仅仅是 Mike 和 Jason。像这样对队列进行硬编码似乎有问题。 当然。提供我明确简单的示例的目的不是为 OP 提供完整的实现,而是指出他们在完全实现项目时可能要考虑的一些有用功能和一般结构。我想避免抽象划分工作线程的核心概念。以上是关于在 C# 中,如何处理具有多个线程/任务但有条件的大型文本文件?的主要内容,如果未能解决你的问题,请参考以下文章