在 C#8 IAsyncEnumerable<T> 中并行化收益返回
Posted
技术标签:
【中文标题】在 C#8 IAsyncEnumerable<T> 中并行化收益返回【英文标题】:Parallelize yield return inside C#8 IAsyncEnumerable<T> 【发布时间】:2020-12-03 01:30:03 【问题描述】:我有一个返回异步枚举器的方法
public async IAsyncEnumerable<IResult> DoWorkAsync()
await Something();
foreach (var item in ListOfWorkItems)
yield return DoWork(item);
和调用者:
public async Task LogResultsAsync()
await foreach (var result in DoWorkAsync())
Console.WriteLine(result);
因为DoWork
是一项昂贵的操作,我更愿意以某种方式并行化它,所以它的工作方式类似于:
public async IAsyncEnumerable<IResult> DoWorkAsync()
await Something();
Parallel.ForEach(ListOfWorkItems, item =>
yield return DoWork(item);
);
但是我无法从Parallel.Foreach
内部进行收益回报,所以想知道最好的方法是什么?
返回结果的顺序无关紧要。
谢谢。
编辑:对不起,我在DoWorkAsync
中遗漏了一些代码,它确实在等待我只是没有把它放在上面的代码中的东西,因为这与问题不太相关。现在更新了
Edit2: DoWork
在我的情况下主要是 I/O 绑定的,它正在从数据库中读取数据。
【问题讨论】:
你什么都没有等待,为什么DoWorkAsync
那么需要异步呢?
DoWorkAsync
实际上是异步的吗?你没有使用await
。
IAsyncEnumerable
的设计意味着来自两个方向的压力:生产者在消费者消费完前一个元素之前不能生产另一个元素,消费者直到消费者消费完一个新元素生产者制作了它。听起来你不想那样,这很好。既然ListOfWorkItems.AsParallel().Select(x => DoWork(x))
返回一个ParallelQuery<T>
,为什么不返回一个ParallelQuery<T>
? (如果顺序无关紧要,可能使用AsUnordered()
)。如果你需要一个IAsyncEnumerable
,你可以遍历ParallelQuery<T>
并产生每个元素
@canton7 我实际上不知道IAsyncEnumerable
是这样工作的。这是个好主意,谢谢
@MindSwipe 更新了问题.. 很抱歉造成混乱
【参考方案1】:
这是一个使用来自TPL Dataflow 库的TransformBlock
的基本实现:
public async IAsyncEnumerable<IResult> GetResults(List<IWorkItem> workItems)
// Define the dataflow block
var block = new TransformBlock<IWorkItem, IResult>(async item =>
return await TransformAsync(item);
, new ExecutionDataflowBlockOptions()
MaxDegreeOfParallelism = 10, // the default is 1
EnsureOrdered = false // the default is true
);
// Feed the block with input data
foreach (var item in workItems)
block.Post(item);
block.Complete();
// Stream the block's output as IAsyncEnumerable
while (await block.OutputAvailableAsync())
while (block.TryReceive(out var result))
yield return result;
// Propagate possible exceptions
await block.Completion;
这个实现并不完美,因为如果IAsyncEnumerable
的使用者过早地放弃枚举,TransformBlock
将继续在后台工作,直到所有工作项都处理完。它也不支持取消,所有受人尊敬的IAsyncEnumerable
生产方法都应该支持。这些缺失的功能可以相对容易地添加。如果您有兴趣添加它们,请查看this 问题。
【讨论】:
要清楚,因为在返回任何结果之前必须输入所有数据,并且由于 maxDegreeofParallism,对于大输入(例如 10k),事情实际上不会开始返回立即地?第 10,000 个项目在至少完成之前的 9989 个项目之前不会发布,此时这 9989 个项目将被设置为立即产生回报。 (所有这些都假定 Post 在满足 MaxDegreeofParallelism 时阻塞,如果不是这样,请纠正我。) @CalebHolt 不,Post
方法将消息推送到TransformBlock
的内部缓冲区中,并立即返回。如果缓冲区已满,则不接受消息并且Post
返回false
。在我的示例中,TransformBlock
未配置BoundedCapacity
,因此其缓冲区大小不受限制,并且所有消息都将被接受。更复杂的方法是使用合理的BoundedCapacity
对其进行配置,并使用异步await block.SendAsync(item);
而不是Post
提供TransformBlock
。【参考方案2】:
根据 canton7 的建议,您可以使用 AsParallel
而不是 Parallel.ForEach
。
这可以在标准 foreach
循环中使用,您可以在其中产生结果:
public async IAsyncEnumerable<IResult> DoWorkAsync()
await Something();
foreach (var result in ListOfWorkItems.AsParallel().Select(DoWork))
yield return result;
正如 Theodor Zoulias 所说,返回的可枚举实际上根本不是异步的。
如果您只需要使用 await foreach
来使用它,这应该不是问题,但更明确地说,您可以返回 IEnumerable
并让调用者并行化它:
public async Task<IEnumerable<Item>> DoWorkAsync()
await Something();
return ListOfWorkItems;
// Caller...
Parallel.ForEach(await DoWorkAsync(), item =>
var result = DoWork(item);
//...
);
虽然如果需要在多个地方调用它可能不太容易维护
【讨论】:
这种方法有两个问题。 (1) PLINQ 非常积极地缓冲产生的结果。这可以通过选项WithMergeOptions(ParallelMergeOptions.NotBuffered)
修复。 (2) PLINQ 使用当前线程作为工作线程之一,因此调用者将被阻塞在一个yield return
和下一个之间。这违背了IAsyncEnumerable
中 Async 的全部目的,并且无法修复。
@TheodorZoulias 对于第 (2) 点,该方法将在 await
之后在线程池上恢复,因此使用该线程的 PLINQ 不会影响调用者。除非有同步上下文,在这种情况下只需添加 ConfigureAwait(false)
。
是的,很可能它将在ThreadPool
上恢复,假设Something().ConfigureAwait(false)
不会返回已完成的任务。但这并不是使枚举异步的原因。真正异步意味着至少对IAsyncEnumerator.MoveNextAsync()
的一些调用将返回未完成的任务。如果所有这些调用都阻塞了当前线程并返回已完成的任务,那么您实际上有一个伪装成IAsyncEnumerable
的IEnumerable
。返回IEnumerable
并在ThreadPool
上枚举它会更诚实。
@TheodorZoulias 也许是的。但是如果调用Something()
和迭代ListOfWorkItems
是相互依赖的,那么将它们放在一个方法中可能会产生更易于维护的代码。等待Something()
释放线程(假设它实际上是异步的),然后迭代必须以阻塞方式发生,而不管它是从哪里调用的。
你说得有道理。在这种情况下,返回 IAsyncEnumerable
有一些优点。不过,这不是该技术的预期用途。返回IAsyncEnumerable
是一个合约,并创建调用者不会被阻塞的期望。此答案的实现仅在await Something()
的持续时间内是异步的,并且枚举的其余部分是阻塞的,因此它违反了合同。我的论点是 PLINQ 可能不是解决这个问题的正确工具。以上是关于在 C#8 IAsyncEnumerable<T> 中并行化收益返回的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 C#8 IAsyncEnumerable<T> 来异步枚举并行运行的任务
C# 8.0 是不是支持 IAsyncEnumerable?