C# Parallel.ForEach 和 Task.WhenAll 有时返回的值比预期的少

Posted

技术标签:

【中文标题】C# Parallel.ForEach 和 Task.WhenAll 有时返回的值比预期的少【英文标题】:C# Parallel.ForEach and Task.WhenAll sometimes returning less values then supposed 【发布时间】:2021-06-11 20:49:03 【问题描述】:

我有这个:

Parallel.ForEach(numbers, (number) =>

    var value = Regex.Replace(number, @"\s+", "%20");

    tasks.Add(client.GetAsync(url + value));
);

await Task.WhenAll(tasks).ConfigureAwait(false);

foreach (var task in tasks)

  ...

有时到达foreach(var task in tasks)时返回的任务较少,但在几次请求后,开始返回所有任务。

我已将 ConfigureAwait 更改为 true,但有时仍会返回较少的任务。

顺便说一句,我使用 Parallel.ForEach 因为每个 client.GetAsync(url + value) 都是对外部 api 的请求,其特殊性在于 99% 的请求的延迟 SLA 低于 1s

你们能解释一下为什么它有时会返回更少的任务吗?

有没有办法保证总是返回所有任务?

谢谢

【问题讨论】:

我敢打赌你用System.Collections.Generic.List 代替tasks。这个集合不是线程安全的。您必须使用线程安全的集合。见System.Collections.Concurrent命名空间 我认为你不需要Parallel.ForEach。使用异步,文档将被并行下载。 Parallel.ForEach 的意义何在?您没有在其中执行任何类型的工作。只需使用普通的foreach 循环即可将所有任务添加到列表中。那么您将不会遇到上述评论中描述的问题。 不行,你可以将任务收集到本地列表中,然后调用WhenAll 你没有在等待 client.GetAsync 所以它应该几乎立即返回,所以我不知道你的说法有什么意义。放弃 Parallel 或切换到线程安全的集合(这将产生自己的最小影响) 【参考方案1】:

有没有办法保证总是返回所有任务?

cmets 中的几个人指出你应该这样做,假设 numbers 是一个非线程安全列表:

    foreach(var number in numbers)
    
        var value = Regex.Replace(number, @"\s+", "%20");

        tasks.Add(client.GetAsync(url + value));
    

    await Task.WhenAll(tasks).ConfigureAwait(false);

    foreach (var task in tasks)
    
      ...
    

并行创建执行下载的任务似乎没有任何明显的好处;这发生得很快。等待下载完成在WhenAll

ps;有多种更复杂的方法可以为 URL 转义数据,但如果您特别希望将任何类型的空格转换为 %20,我想用正则表达式来做是有意义的..

编辑;你问什么时候使用 Parallel ForEach,我会说“一般来说不要,因为你必须更加小心使用它的上下文”,但是如果你让 Parallel.ForEach 做得更多同步工作,这可能是有道理的:

    Parallel.ForEach(numbers, number =>
    
        var value = Regex.Replace(number, @"\s+", "%20");

        var r = client.Get(url + value));

        //do something meaningful with r here, i.e. whatever ... is in your  foreach (var task in tasks)

    );

但请注意,如果您出于协调目的从主体内部对某些共享事物执行更新,那么它需要是线程安全的

【讨论】:

现在就去试试吧,并发收集吧? 不需要并发采集;同时没有任何事情发生;它在foreach 中按顺序发生。最关键的部分是你不要在foreach中await(你没有,但我说“不要试图添加它”),否则请求的IO 按顺序完成 它现在正在工作,所有任务都以与并行 foreach 相同的速度返回并且没有 cuncurrent 集合,将进行更多测试,然后接受这个答案:) @pinkfloydx33 我还是 CW 的,因为它实际上只是在想象其他人在说什么,所以我不觉得这是“我的答案”.. 但谢谢! :) @MarchalPT 当你有 CPU 密集型的工作要做,并且你想使用机器的多个内核来加快速度时,你应该使用 Parallel.ForEach。这种方法有点原始,它有一个有问题的默认并行度,一般不太可能把你扔到成功的坑里,除非你很清楚自己在做什么。 PLINQ 是一种更安全的并行工作工具。它类似于 LINQ,但以 .AsParallel() 开头。例如:Task[] tasks = urls.AsParallel().AsOrdered().Select(url => client.GetAsync(url)).ToArray();【参考方案2】:

您还没有显示它,所以我们只能猜测,但我假设tasksList<>。此集合类型不是线程安全的;您的并行循环可能会“覆盖”值。执行手动锁定列表或切换到线程安全集合,例如 ConcurrentQueue<>

var tasks = new ConcurrentQueue<Task<string>>();

Parallel.ForEach(numbers, number =>

    var value = Regex.Replace(number, @"\s+", "%20");
    tasks.Enqueue(client.GetAsync(url + value));
);

await Task.WhenAll(tasks.ToArray()).ConfigureAwait(false);

foreach (var task in tasks)

   // whatever 

也就是说,您对Parallel.ForEach 的使用非常可疑。您没有在循环内执行任何具有实际意义的操作。使用Parallel,尤其是在适当锁定的情况下,可能会产生更高的开销,从而抵消您声称观察到或通过并行化Regex 调用实现的任何潜在收益。我会将其转换为普通的 foreach 循环并预编译 Regex 以抵消(部分)其开销:

// in class
private static readonly Regex SpaceRegex = new Regex(@"\s+", RegexOptions.Compiled);

// in method
var tasks = new List<Task<string>>();

foreach (var number in numbers)

    var value = SpaceRegex.Replace(number, "%20");
    tasks.Add(client.GetAsync(url + value));


await Task.WhenAll(tasks).ConfigureAwait(false);

foreach (var task in tasks)

   // whatever 

或者,根本不要使用正则表达式。使用适当的 Uri 转义机制,它不仅可以修复空格,还能带来额外的好处:

var value = Uri.EscapeDataString(number);
// or
var fullUri = Uri.EscapeUriString(url + number);

注意这里有两种不同的方法。正确的使用取决于urlnumber 的值。还有其他机制,例如 HttpUtility.UrlEncode 方法...但我认为这些是首选的。

【讨论】:

喜欢你使用正则表达式的方式 现在去试试,对带有https://的url有效吗? @MarchalPT 参见示例:tio.run/… 投反对票仅仅是因为建议将 very specialized ConcurrentBag&lt;T&gt; 类作为容器。此类适用于极为罕见的混合生产者-消费者场景。存储Parallel.ForEach 循环结果的更好选择是ConcurrentQueue&lt;T&gt; @TheodorZoulias 现在开心吗?

以上是关于C# Parallel.ForEach 和 Task.WhenAll 有时返回的值比预期的少的主要内容,如果未能解决你的问题,请参考以下文章

C#的并发循环(for,foreach,parallel.for,parallel.foreach)对比

Python 中的 C# Parallel.Foreach 等效项

C#并发实战Parallel.ForEach使用

我在 Parallel.ForEach 循环中收到 TaskCanceledException,如何解决?

如何通过 Parallel.ForEach 实现最大并行度并利用最大 CPU?

c# 异步编程学习笔记