在一些 .NET Parallel.ForEach() 代码中做一些异步/等待可以吗?

Posted

技术标签:

【中文标题】在一些 .NET Parallel.ForEach() 代码中做一些异步/等待可以吗?【英文标题】:Is it OK to do some async/await inside some .NET Parallel.ForEach() code? 【发布时间】:2014-07-06 22:46:21 【问题描述】:

鉴于以下代码,在Parallel.ForEach 中执行async/await 是否OK

例如。

Parallel.ForEach(names, name =>

    // Do some stuff...

    var foo = await GetStuffFrom3rdPartyAsync(name);

    // Do some more stuff, with the foo.
);

还是有一些我需要注意的问题?

编辑:不知道这是否可以编译,顺便说一句。只是伪代码.. 大声思考。

【问题讨论】:

【参考方案1】:

不,将asyncParalell.Foreach 组合起来没有意义。

考虑以下示例:

private void DoSomething()

    var names = Enumerable.Range(0,10).Select(x=> "Somename" + x);
    Parallel.ForEach(names, async(name) =>
       
        await Task.Delay(1000);
        Console.WriteLine("Name 0 completed",name);
    );
    Console.WriteLine("Parallel ForEach completed");

你会期待什么输出?

Name Somename3 completed
Name Somename8 completed
Name Somename4 completed
...
Parallel ForEach completed

这不会发生。它将输出:

Parallel ForEach completed
Name Somename3 completed
Name Somename8 completed
Name Somename4 completed
...

为什么?因为当ForEach 第一个命中await 时,该方法实际返回,Parallel.ForEach 不知道它是异步的,它运行完成!。 await 之后的代码在另一个线程上作为延续运行不是“并行处理线程”

Stephen toub addressed this here

【讨论】:

好的 - 那么你有什么建议呢? @Pure.Krome 您试图解决的问题是什么?在给定的背景下,很难回答。如果可以选择等待,您可以Task.Run(async ()=> await Something()).Wait();【参考方案2】:

根据名称,我假设 GetStuffFrom3rdPartyAsync 是 I/O 绑定的。 Parallel 类专门用于 CPU 绑定代码。

在异步世界中,您可以使用Task.WhenAll 启动多个任务,然后(异步)等待它们全部完成。由于您从一个序列开始,将每个元素投影到一个异步操作可能是最简单的,然后等待所有这些操作:

await Task.WhenAll(names.Select(async name =>

  // Do some stuff...
  var foo = await GetStuffFrom3rdPartyAsync(name);
  // Do some more stuff, with the foo.
));

【讨论】:

【参考方案3】:

一个接近的选择可能是这样的:

static void ForEach<T>(IEnumerable<T> data, Func<T, Task> func)

    var tasks = data.Select(item => 
        Task.Run(() => func(item)));

    Task.WaitAll(tasks.ToArray());


// ... 

ForEach(names, name => GetStuffFrom3rdPartyAsync(name));

理想情况下,您不应该使用像Task.WaitAll 这样的阻塞调用,如果您可以在当前调用堆栈上“一直向下”调用async 的整个方法链:

var tasks = data.Select(item => 
    Task.Run(() => func(item)));

await Task.WhenAll(tasks.ToArray());

此外,如果您在 GetStuffFrom3rdPartyAsync 内不做任何 CPU 密集型工作,Task.Run 可能是多余的:

var tasks = data.Select(item => func(item));

【讨论】:

【参考方案4】:

正如@Sriram Sakthivel 所指出的,将Parallel.ForEach 与异步lambda 一起使用存在一些问题。 Steven Toub 的ForEachASync 可以做到这一点。他谈论它here,但这里是代码:

public static class Extensions

    public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
    
        return Task.WhenAll(
            from partition in Partitioner.Create(source).GetPartitions(dop)
            select Task.Run(async delegate 
                                               using (partition) while (partition.MoveNext()) await body(partition.Current);
            ));
    

它使用Partitioner 类创建负载平衡分区器(doco),并允许您使用dop 参数指定要运行的线程数。看看它和Parallel.ForEach 的区别。试试下面的代码。

 class Program
    
        public static async Task GetStuffParallelForEach()
        
            var data = Enumerable.Range(1, 10);
            Parallel.ForEach(data, async i =>
            
                await Task.Delay(1000 * i);
                Console.WriteLine(i);
            );
        

        public static async Task GetStuffForEachAsync()
        
            var data = Enumerable.Range(1, 10);
            await data.ForEachAsync(5, async i =>
            
                await Task.Delay(1000 * i);
                Console.WriteLine(i);
            );

        

        static void Main(string[] args)
        
            //GetStuffParallelForEach().Wait(); // Finished printed before work is complete
            GetStuffForEachAsync().Wait(); // Finished printed after all work is done
            Console.WriteLine("Finished");
            Console.ReadLine();
        

如果您运行GetStuffForEachAsync,程序会等待所有工作完成。如果你运行GetStuffParallelForEach,那么Finished这一行将在工作完成之前被打印出来。

【讨论】:

以上是关于在一些 .NET Parallel.ForEach() 代码中做一些异步/等待可以吗?的主要内容,如果未能解决你的问题,请参考以下文章

.Net 中的多个 Parallel.ForEach 循环

在 .NET 3.5 中将 Parallel.Foreach 与分区器一起使用

VS2015升级后的垃圾收集和Parallel.ForEach问题

Asp.Net 中有没有办法与运行 Parallel.Foreach 的后台线程进行通信

如何限制 Parallel.ForEach?

Parallel.Foreach 给出错误“索引超出了数组的范围”