如何线程安全地将并行进程中的数据收集到单个对象中?

Posted

技术标签:

【中文标题】如何线程安全地将并行进程中的数据收集到单个对象中?【英文标题】:How can I thread safely collect data from parallel processes into a single object? 【发布时间】:2021-01-04 18:20:59 【问题描述】:

我有以下发送多条短信的功能:

BulkSMSSenderResult bulkResult = new BulkSMSSenderResult();

if (BulkRequest.Requests.Any()) 
    IEnumerable<(SMSSenderRequest, Task<Nito.Try<SMSSenderResult>>)> sendSmsTasks 
        = BulkRequest.Requests.Select(request => (request, SendSingleSmsAsync(request)));
    await Task.WhenAll(sendSmsTasks.Select(task => task.Item2));

    sendSmsTasks.ToList()
        .ForEach(task => 
            (SMSSenderRequest request, Task<Nito.Try<SMSSenderResult>> tryResult) = task;
            _ = tryResult.Result.Match<Either<ErrorMessage, SMSSenderResult>>(
                exception => new ErrorMessage(exception, request),
                value => value
            )
            .Match(
                result => bulkResult.Add(result),
                error => bulkResult.Add(error)
            );
        );


if (BulkRequest.BadRequests.Any()) 
    bulkResult.InvalidRequests = BulkRequest.BadRequests;


WriteResponseAsync(context, StatusCodes.Status207MultiStatus, bulkResult);

几乎按预期工作,但似乎所有 SMS 都发送了两次。

我认为问题可能出在这一行:

await Task.WhenAll(sendSmsTasks.Select(task => task.Item2));

我的期望是这一行应该检查 SMS 是否已发送,以便后面的代码可以安全执行。

但是,这行代码和后面的代码似乎都导致SendSingleSmsAsync(request) 执行...或者其他原因(我无法推测)导致了这项工作触发两次(我确信SendSingleSmsAsync(request) 本身工作正常)。

任何想法如何解决这个问题?

【问题讨论】:

【参考方案1】:

你的问题出在这句话上:

IEnumerable<(SMSSenderRequest, Task<Nito.Try<SMSSenderResult>>)> sendSmsTasks 
                = BulkRequest.Requests.Select(request => (request, SendSingleSmsAsync(request)));

Select(request =&gt; (request, SendSingleSmsAsync(request)) 中,Select 是一个投影,因此每次枚举该查询时都会对其进行评估。

换句话说,您将在这些地方有一个 (request, SendSingleSmsAsync(request)) 对:

sendSmsTasks.Select(task =&gt; task.Item2),和 sendSmsTasks.ToList()

由于SendSingleSmsAsync 是一个返回Task 的方法调用,因此您最终会执行两次操作。

你可以很容易地解决这个问题:

var sendSmsTasks = BulkRequest.Requests
    .Select(request => (request, SendSingleSmsAsync(request)))
    .ToArray();

但我强烈建议您深入了解您的代码,因为它变得非常复杂。另外,.ToList().ForEach() 完全浪费资源(额外分配的内存,额外循环的时间),所以将其更改为简单的foreach

【讨论】:

【参考方案2】:

您必须将可枚举的 Linq 操作更多地视为设置要执行的小迷你程序,而不是实际执行和运行它们。

在您的情况下,您在此处设置计算:

IEnumerable<(SMSSenderRequest, Task<Nito.Try<SMSSenderResult>>)> sendSmsTasks 
                    = BulkRequest.Requests.Select(request => (request, SendSingleSmsAsync(request)))

但是你实际上经历并执行了两次 - 在这里:

await Task.WhenAll(sendSmsTasks.Select(task => task.Item2));

这里:

sendSmsTasks.ToList()

解决方法是尽快“实现”可枚举,以便从那时起您处理的是实际数据,而不是可链接的、懒惰的、可能的东西。

尝试在可枚举声明的末尾添加.ToArray()

【讨论】:

以上是关于如何线程安全地将并行进程中的数据收集到单个对象中?的主要内容,如果未能解决你的问题,请参考以下文章

线程的5个重要概念

多线程与线程池

进程队列补充socket实现服务器并发线程完结

基础篇之多线程总结

如何在并行 JVM 进程(不是线程)中运行 TestNG 测试

如何将项目添加到字典“并行循环安全”