如何优化读取文件列表并将它们存储在数据库中?
Posted
技术标签:
【中文标题】如何优化读取文件列表并将它们存储在数据库中?【英文标题】:How to optimize reading a list of files and storing them in a database? 【发布时间】:2021-10-03 23:51:38 【问题描述】:我最近在一次采访中被问到一个问题,这真的让我思考。
我正在努力了解和了解有关多线程、并行性和并发性以及性能的更多信息。
场景是您有一个文件路径列表。文件保存在您的 HDD 或 Blob 存储上。 您已阅读文件并将它们存储在数据库中。你会如何以最优化的方式做到这一点?
以下是我能想到的一些方法:
最简单的方法是遍历列表并按顺序执行此任务。
Foreach(var filePath in filePaths)
ProcessFile(filePath);
public void ProcessFile(string filePath)
var file = readFile(filePath);
storeInDb(file);
我能想到的第二种方法可能是创建多个线程:
Foreach(var filePath in filePaths)
Thread t = new Thread(ProcessFIle(filePath));
t.Start();
(not sure if the above code is correct.)
第三种方式是使用异步等待
List<Tasks> listOfTasks;
Foreach(var filePath in filePaths)
var task = ProcessFile(filePath);
listOfTasks.Add(task);
Task.WhenAll(listOftasks);
public async void ProcessFile(string filePath)
var file = readFile(filePath);
storeInDb(file);
第四种方式是并行。对于:
Parallel.For(0,filePaths.Count , new ParallelOptions MaxDegreeOfParallelism = 10 , i =>
ProcessFile(filePaths[i]);
);
它们之间有什么区别。哪一个更适合这份工作,还有什么更好的吗?
【问题讨论】:
您只是将文件本身移动到数据库中,还是解析文件的内容,例如 .csv 并将内容发送到数据库。 @DekuDesu 是的,您正在解析文件的内容。 你可以看看这个问题:Parallel.ForEach vs Task.Run and Task.WhenAll。它可能会直接回答您的问题。顺便说一句,您在问题中提到的选项都不是最佳选项。仅使用数据并行性不会获得最佳性能。您还需要任务并行性。有一个例子here。 您的第三种方式,“异步等待”方法的编码非常糟糕。它实际上并没有使用await
,而async
应该是async void
,而它应该是async Task
。最好运行自己的 Task.Run
调用,以确保它被推送到后台任务。
【参考方案1】:
您还可以使用 Microsoft 的反应式框架(又名 Rx)- NuGet System.Reactive
并添加 using System.Reactive.Linq;
- 然后您可以这样做:
IObservable<string> query =
from filePath in filePaths.ToObservable()
from file in Observable.Start(() => ReadFile(filePath))
from db in Observable.Start(() => StoreInDb(file))
select filePath;
IDisposable subscription =
query
.Subscribe(
filePath => Console.WriteLine($"filePath Processed."),
() => Console.WriteLine("Done."));
【讨论】:
问题更多的是性能优化。 @SamuraiJack - 这确实可以处理。这一切都是用多个线程完成的,它会自动平衡线程池的使用。【参考方案2】:我写了一个简单的扩展方法来帮助启动异步任务,限制并发量,并等待它们全部完成;
public static async Task WhenAll(this IEnumerable<Task> tasks, int batchSize)
var started = new List<Task>();
foreach(var t in tasks)
started.Add(t);
if (started.Count >= batchSize)
var ended = await Task.WhenAny(started);
started.Remove(ended);
await Task.WhenAll(started);
然后您需要一种将文件内容直接流式传输到数据库的方法。例如;
async Task Process(string filename)
using var stream = File.OpenRead(filename)
// TODO connect to the database
var sqlCommand = ...;
sqlCommand.CommandText = "update [table] set [column] = @stream";
sqlCommand.Parameters.Add(new SqlParameter("@stream", SqlDbType.VarBinary)
Value = stream
);
await sqlCommand.ExecuteNonQueryAsync();
IEnumerable<string> files = ...;
await files.Select(f => Process(f)).WhenAll(20);
这是最好的方法吗?可能不是。因为这个扩展太容易被滥用了。多次意外启动任务,或一次启动所有任务。
【讨论】:
为此,IEnumerable<Task> tasks
不应是物化集合。您可以添加一些参数验证代码,例如if (tasks is ICollection<Task>) throw...
,但总体而言,由于您提到的原因,这不是一个好的解决方案。最后的Task.WhenAll
还有一个bug,只会等待最后的batchSize
任务,之前的任务抛出的任何异常都会被吞掉。一般来说,用于限制的正确工具是SemaphoreSlim
,而不是Task.WhenAny
。以上是关于如何优化读取文件列表并将它们存储在数据库中?的主要内容,如果未能解决你的问题,请参考以下文章