如何优化读取文件列表并将它们存储在数据库中?

Posted

技术标签:

【中文标题】如何优化读取文件列表并将它们存储在数据库中?【英文标题】:How to optimize reading a list of files and storing them in a database? 【发布时间】:2021-10-03 23:51:38 【问题描述】:

我最近在一次采访中被问到一个问题,这真的让我思考。

我正在努力了解和了解有关多线程、并行性和并发性以及性能的更多信息。

场景是您有一个文件路径列表。文件保存在您的 HDD 或 Blob 存储上。 您已阅读文件并将它们存储在数据库中。你会如何以最优化的方式做到这一点?

以下是我能想到的一些方法:

最简单的方法是遍历列表并按顺序执行此任务。

Foreach(var filePath in filePaths)

  ProcessFile(filePath);


public void ProcessFile(string filePath)

  var file = readFile(filePath);
  storeInDb(file);

我能想到的第二种方法可能是创建多个线程:

Foreach(var filePath in filePaths)

Thread t  = new Thread(ProcessFIle(filePath));
t.Start();


(not sure if the above code is correct.)

第三种方式是使用异步等待

List<Tasks> listOfTasks;
Foreach(var filePath in filePaths)

  var task = ProcessFile(filePath);
  listOfTasks.Add(task);

Task.WhenAll(listOftasks);

public async void ProcessFile(string filePath)

  var file = readFile(filePath);
  storeInDb(file);

第四种方式是并行。对于:

Parallel.For(0,filePaths.Count , new ParallelOptions  MaxDegreeOfParallelism = 10 , i =>
    
        ProcessFile(filePaths[i]);
    );

它们之间有什么区别。哪一个更适合这份工作,还有什么更好的吗?

【问题讨论】:

您只是将文件本身移动到数据库中,还是解析文件的内容,例如 .csv 并将内容发送到数据库。 @DekuDesu 是的,您正在解析文件的内容。 你可以看看这个问题:Parallel.ForEach vs Task.Run and Task.WhenAll。它可能会直接回答您的问题。顺便说一句,您在问题中提到的选项都不是最佳选项。仅使用数据并行性不会获得最佳性能。您还需要任务并行性。有一个例子here。 您的第三种方式,“异步等待”方法的编码非常糟糕。它实际上并没有使用await,而async 应该是async void,而它应该是async Task。最好运行自己的 Task.Run 调用,以确保它被推送到后台任务。 【参考方案1】:

您还可以使用 Microsoft 的反应式框架(又名 Rx)- NuGet System.Reactive 并添加 using System.Reactive.Linq; - 然后您可以这样做:

IObservable<string> query =
    from filePath in filePaths.ToObservable()
    from file in Observable.Start(() => ReadFile(filePath))
    from db in Observable.Start(() => StoreInDb(file))
    select filePath;

IDisposable subscription =
    query
        .Subscribe(
            filePath => Console.WriteLine($"filePath Processed."),
            () => Console.WriteLine("Done."));

【讨论】:

问题更多的是性能优化。 @SamuraiJack - 这确实可以处理。这一切都是用多个线程完成的,它会自动平衡线程池的使用。【参考方案2】:

我写了一个简单的扩展方法来帮助启动异步任务,限制并发量,并等待它们全部完成;

public static async Task WhenAll(this IEnumerable<Task> tasks, int batchSize)

    var started = new List<Task>();

    foreach(var t in tasks)
    
        started.Add(t);
        if (started.Count >= batchSize)
        
            var ended = await Task.WhenAny(started);
            started.Remove(ended);
        
    
    await Task.WhenAll(started);

然后您需要一种将文件内容直接流式传输到数据库的方法。例如;

async Task Process(string filename)
    using var stream = File.OpenRead(filename)

    // TODO connect to the database
    var sqlCommand = ...;
    sqlCommand.CommandText = "update [table] set [column] = @stream";
    sqlCommand.Parameters.Add(new SqlParameter("@stream", SqlDbType.VarBinary)
    
        Value = stream
    );
    await sqlCommand.ExecuteNonQueryAsync();

IEnumerable<string> files = ...;
await files.Select(f => Process(f)).WhenAll(20);

这是最好的方法吗?可能不是。因为这个扩展太容易被滥用了。多次意外启动任务,或一次启动所有任务。

【讨论】:

为此,IEnumerable&lt;Task&gt; tasks 不应是物化集合。您可以添加一些参数验证代码,例如if (tasks is ICollection&lt;Task&gt;) throw...,但总体而言,由于您提到的原因,这不是一个好的解决方案。最后的Task.WhenAll还有一个bug,只会等待最后的batchSize任务,之前的任务抛出的任何异常都会被吞掉。一般来说,用于限制的正确工具是SemaphoreSlim,而不是Task.WhenAny

以上是关于如何优化读取文件列表并将它们存储在数据库中?的主要内容,如果未能解决你的问题,请参考以下文章

Python:如何读取文件并将某些列存储在数组中

如何使用 lua 读取文件夹名称并将它们放入表列表中

Java读取列文件中具有不同数字的txt并将数据存储在arraylist中

C#读取文本文件并将值存储在不同的列表中

将drawable中的图像作为文件读取

使用十六进制数据读取文件并将其存储到python中的列表中