如何线程安全地将并行进程中的数据收集到单个对象中?
Posted
技术标签:
【中文标题】如何线程安全地将并行进程中的数据收集到单个对象中?【英文标题】:How can I thread safely collect data from parallel processes into a single object? 【发布时间】:2021-01-04 18:20:59 【问题描述】:我有以下发送多条短信的功能:
BulkSMSSenderResult bulkResult = new BulkSMSSenderResult();
if (BulkRequest.Requests.Any())
IEnumerable<(SMSSenderRequest, Task<Nito.Try<SMSSenderResult>>)> sendSmsTasks
= BulkRequest.Requests.Select(request => (request, SendSingleSmsAsync(request)));
await Task.WhenAll(sendSmsTasks.Select(task => task.Item2));
sendSmsTasks.ToList()
.ForEach(task =>
(SMSSenderRequest request, Task<Nito.Try<SMSSenderResult>> tryResult) = task;
_ = tryResult.Result.Match<Either<ErrorMessage, SMSSenderResult>>(
exception => new ErrorMessage(exception, request),
value => value
)
.Match(
result => bulkResult.Add(result),
error => bulkResult.Add(error)
);
);
if (BulkRequest.BadRequests.Any())
bulkResult.InvalidRequests = BulkRequest.BadRequests;
WriteResponseAsync(context, StatusCodes.Status207MultiStatus, bulkResult);
这几乎按预期工作,但似乎所有 SMS 都发送了两次。
我认为问题可能出在这一行:
await Task.WhenAll(sendSmsTasks.Select(task => task.Item2));
我的期望是这一行应该检查 SMS 是否已发送,以便后面的代码可以安全执行。
但是,这行代码和后面的代码似乎都导致SendSingleSmsAsync(request)
执行...或者其他原因(我无法推测)导致了这项工作触发两次(我确信SendSingleSmsAsync(request)
本身工作正常)。
任何想法如何解决这个问题?
【问题讨论】:
【参考方案1】:你的问题出在这句话上:
IEnumerable<(SMSSenderRequest, Task<Nito.Try<SMSSenderResult>>)> sendSmsTasks
= BulkRequest.Requests.Select(request => (request, SendSingleSmsAsync(request)));
在Select(request => (request, SendSingleSmsAsync(request))
中,Select
是一个投影,因此每次枚举该查询时都会对其进行评估。
换句话说,您将在这些地方有一个新 (request, SendSingleSmsAsync(request))
对:
sendSmsTasks.Select(task => task.Item2)
,和
sendSmsTasks.ToList()
由于SendSingleSmsAsync
是一个返回Task
的方法调用,因此您最终会执行两次操作。
你可以很容易地解决这个问题:
var sendSmsTasks = BulkRequest.Requests
.Select(request => (request, SendSingleSmsAsync(request)))
.ToArray();
但我强烈建议您深入了解您的代码,因为它变得非常复杂。另外,.ToList().ForEach()
完全浪费资源(额外分配的内存,额外循环的时间),所以将其更改为简单的foreach
【讨论】:
【参考方案2】:您必须将可枚举的 Linq 操作更多地视为设置要执行的小迷你程序,而不是实际执行和运行它们。
在您的情况下,您在此处设置计算:
IEnumerable<(SMSSenderRequest, Task<Nito.Try<SMSSenderResult>>)> sendSmsTasks
= BulkRequest.Requests.Select(request => (request, SendSingleSmsAsync(request)))
但是你实际上经历并执行了两次 - 在这里:
await Task.WhenAll(sendSmsTasks.Select(task => task.Item2));
这里:
sendSmsTasks.ToList()
解决方法是尽快“实现”可枚举,以便从那时起您处理的是实际数据,而不是可链接的、懒惰的、可能的东西。
尝试在可枚举声明的末尾添加.ToArray()
。
【讨论】:
以上是关于如何线程安全地将并行进程中的数据收集到单个对象中?的主要内容,如果未能解决你的问题,请参考以下文章