在一些 .NET Parallel.ForEach() 代码中做一些异步/等待可以吗?
Posted
技术标签:
【中文标题】在一些 .NET Parallel.ForEach() 代码中做一些异步/等待可以吗?【英文标题】:Is it OK to do some async/await inside some .NET Parallel.ForEach() code? 【发布时间】:2014-07-06 22:46:21 【问题描述】:鉴于以下代码,在Parallel.ForEach
中执行async/await
是否OK?
例如。
Parallel.ForEach(names, name =>
// Do some stuff...
var foo = await GetStuffFrom3rdPartyAsync(name);
// Do some more stuff, with the foo.
);
还是有一些我需要注意的问题?
编辑:不知道这是否可以编译,顺便说一句。只是伪代码.. 大声思考。
【问题讨论】:
【参考方案1】:不,将async
与Paralell.Foreach
组合起来没有意义。
考虑以下示例:
private void DoSomething()
var names = Enumerable.Range(0,10).Select(x=> "Somename" + x);
Parallel.ForEach(names, async(name) =>
await Task.Delay(1000);
Console.WriteLine("Name 0 completed",name);
);
Console.WriteLine("Parallel ForEach completed");
你会期待什么输出?
Name Somename3 completed
Name Somename8 completed
Name Somename4 completed
...
Parallel ForEach completed
这不会发生。它将输出:
Parallel ForEach completed
Name Somename3 completed
Name Somename8 completed
Name Somename4 completed
...
为什么?因为当ForEach
第一个命中await
时,该方法实际返回,Parallel.ForEach
不知道它是异步的,它运行完成!。 await
之后的代码在另一个线程上作为延续运行不是“并行处理线程”
Stephen toub addressed this here
【讨论】:
好的 - 那么你有什么建议呢? @Pure.Krome 您试图解决的问题是什么?在给定的背景下,很难回答。如果可以选择等待,您可以Task.Run(async ()=> await Something()).Wait();
【参考方案2】:
根据名称,我假设 GetStuffFrom3rdPartyAsync
是 I/O 绑定的。 Parallel
类专门用于 CPU 绑定代码。
在异步世界中,您可以使用Task.WhenAll
启动多个任务,然后(异步)等待它们全部完成。由于您从一个序列开始,将每个元素投影到一个异步操作可能是最简单的,然后等待所有这些操作:
await Task.WhenAll(names.Select(async name =>
// Do some stuff...
var foo = await GetStuffFrom3rdPartyAsync(name);
// Do some more stuff, with the foo.
));
【讨论】:
【参考方案3】:一个接近的选择可能是这样的:
static void ForEach<T>(IEnumerable<T> data, Func<T, Task> func)
var tasks = data.Select(item =>
Task.Run(() => func(item)));
Task.WaitAll(tasks.ToArray());
// ...
ForEach(names, name => GetStuffFrom3rdPartyAsync(name));
理想情况下,您不应该使用像Task.WaitAll
这样的阻塞调用,如果您可以在当前调用堆栈上“一直向下”调用async
的整个方法链:
var tasks = data.Select(item =>
Task.Run(() => func(item)));
await Task.WhenAll(tasks.ToArray());
此外,如果您在 GetStuffFrom3rdPartyAsync
内不做任何 CPU 密集型工作,Task.Run
可能是多余的:
var tasks = data.Select(item => func(item));
【讨论】:
【参考方案4】:正如@Sriram Sakthivel 所指出的,将Parallel.ForEach
与异步lambda 一起使用存在一些问题。 Steven Toub 的ForEachASync
可以做到这一点。他谈论它here,但这里是代码:
public static class Extensions
public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
return Task.WhenAll(
from partition in Partitioner.Create(source).GetPartitions(dop)
select Task.Run(async delegate
using (partition) while (partition.MoveNext()) await body(partition.Current);
));
它使用Partitioner
类创建负载平衡分区器(doco),并允许您使用dop
参数指定要运行的线程数。看看它和Parallel.ForEach
的区别。试试下面的代码。
class Program
public static async Task GetStuffParallelForEach()
var data = Enumerable.Range(1, 10);
Parallel.ForEach(data, async i =>
await Task.Delay(1000 * i);
Console.WriteLine(i);
);
public static async Task GetStuffForEachAsync()
var data = Enumerable.Range(1, 10);
await data.ForEachAsync(5, async i =>
await Task.Delay(1000 * i);
Console.WriteLine(i);
);
static void Main(string[] args)
//GetStuffParallelForEach().Wait(); // Finished printed before work is complete
GetStuffForEachAsync().Wait(); // Finished printed after all work is done
Console.WriteLine("Finished");
Console.ReadLine();
如果您运行GetStuffForEachAsync
,程序会等待所有工作完成。如果你运行GetStuffParallelForEach
,那么Finished
这一行将在工作完成之前被打印出来。
【讨论】:
以上是关于在一些 .NET Parallel.ForEach() 代码中做一些异步/等待可以吗?的主要内容,如果未能解决你的问题,请参考以下文章
在 .NET 3.5 中将 Parallel.Foreach 与分区器一起使用
VS2015升级后的垃圾收集和Parallel.ForEach问题