在 C#8 IAsyncEnumerable<T> 中并行化收益返回

Posted

技术标签:

【中文标题】在 C#8 IAsyncEnumerable<T> 中并行化收益返回【英文标题】:Parallelize yield return inside C#8 IAsyncEnumerable<T> 【发布时间】:2020-12-03 01:30:03 【问题描述】:

我有一个返回异步枚举器的方法

    public async IAsyncEnumerable<IResult> DoWorkAsync()
    
        await Something();
        foreach (var item in ListOfWorkItems)
        
            yield return DoWork(item);
        
    

和调用者:

    public async Task LogResultsAsync()
    
        await foreach (var result in DoWorkAsync())
        
            Console.WriteLine(result);
        
    

因为DoWork 是一项昂贵的操作,我更愿意以某种方式并行化它,所以它的工作方式类似于:

    public async IAsyncEnumerable<IResult> DoWorkAsync()
    
        await Something();
        Parallel.ForEach(ListOfWorkItems, item =>
        
            yield return DoWork(item);
        );
    

但是我无法从Parallel.Foreach 内部进行收益回报,所以想知道最好的方法是什么?

返回结果的顺序无关紧要。

谢谢。

编辑:对不起,我在DoWorkAsync 中遗漏了一些代码,它确实在等待我只是没有把它放在上面的代码中的东西,因为这与问题不太相关。现在更新了

Edit2: DoWork 在我的情况下主要是 I/O 绑定的,它正在从数据库中读取数据。

【问题讨论】:

你什么都没有等待,为什么DoWorkAsync那么需要异步呢? DoWorkAsync 实际上是异步的吗?你没有使用await IAsyncEnumerable的设计意味着来自两个方向的压力:生产者在消费者消费完前一个元素之前不能生产另一个元素,消费者直到消费者消费完一个新元素生产者制作了它。听起来你不想那样,这很好。既然ListOfWorkItems.AsParallel().Select(x =&gt; DoWork(x)) 返回一个ParallelQuery&lt;T&gt;,为什么不返回一个ParallelQuery&lt;T&gt;? (如果顺序无关紧要,可能使用AsUnordered())。如果你需要一个IAsyncEnumerable,你可以遍历ParallelQuery&lt;T&gt;并产生每个元素 @canton7 我实际上不知道IAsyncEnumerable 是这样工作的。这是个好主意,谢谢 @MindSwipe 更新了问题.. 很抱歉造成混乱 【参考方案1】:

这是一个使用来自TPL Dataflow 库的TransformBlock 的基本实现:

public async IAsyncEnumerable<IResult> GetResults(List<IWorkItem> workItems)

    // Define the dataflow block
    var block = new TransformBlock<IWorkItem, IResult>(async item =>
    
        return await TransformAsync(item);
    , new ExecutionDataflowBlockOptions()
    
        MaxDegreeOfParallelism = 10, // the default is 1
        EnsureOrdered = false // the default is true
    );

    // Feed the block with input data
    foreach (var item in workItems)
    
        block.Post(item);
    
    block.Complete();

    // Stream the block's output as IAsyncEnumerable
    while (await block.OutputAvailableAsync())
    
        while (block.TryReceive(out var result))
        
            yield return result;
        
    

    // Propagate possible exceptions
    await block.Completion;

这个实现并不完美,因为如果IAsyncEnumerable 的使用者过早地放弃枚举,TransformBlock 将继续在后台工作,直到所有工作项都处理完。它也不支持取消,所有受人尊敬的IAsyncEnumerable 生产方法都应该支持。这些缺失的功能可以相对容易地添加。如果您有兴趣添加它们,请查看this 问题。

【讨论】:

要清楚,因为在返回任何结果之前必须输入所有数据,并且由于 maxDegreeofParallism,对于大输入(例如 10k),事情实际上不会开始返回立即地?第 10,000 个项目在至少完成之前的 9989 个项目之前不会发布,此时这 9989 个项目将被设置为立即产生回报。 (所有这些都假定 Post 在满足 MaxDegreeofParallelism 时阻塞,如果不是这样,请纠正我。) @CalebHolt 不,Post 方法将消息推送到TransformBlock 的内部缓冲区中,并立即返回。如果缓冲区已满,则不接受消息并且Post 返回false。在我的示例中,TransformBlock 未配置BoundedCapacity,因此其缓冲区大小不受限制,并且所有消息都将被接受。更复杂的方法是使用合理的BoundedCapacity 对其进行配置,并使用异步await block.SendAsync(item); 而不是Post 提供TransformBlock【参考方案2】:

根据 canton7 的建议,您可以使用 AsParallel 而不是 Parallel.ForEach

这可以在标准 foreach 循环中使用,您可以在其中产生结果:

public async IAsyncEnumerable<IResult> DoWorkAsync()

    await Something();
    foreach (var result in ListOfWorkItems.AsParallel().Select(DoWork))
    
        yield return result;
    


正如 Theodor Zoulias 所说,返回的可枚举实际上根本不是异步的。

如果您只需要使用 await foreach 来使用它,这应该不是问题,但更明确地说,您可以返回 IEnumerable 并让调用者并行化它:

public async Task<IEnumerable<Item>> DoWorkAsync()

    await Something();
    return ListOfWorkItems;


// Caller...
Parallel.ForEach(await DoWorkAsync(), item => 

    var result = DoWork(item);
    //...
);

虽然如果需要在多个地方调用它可能不太容易维护

【讨论】:

这种方法有两个问题。 (1) PLINQ 非常积极地缓冲产生的结果。这可以通过选项WithMergeOptions(ParallelMergeOptions.NotBuffered) 修复。 (2) PLINQ 使用当前线程作为工作线程之一,因此调用者将被阻塞在一个yield return 和下一个之间。这违背了IAsyncEnumerableAsync 的全部目的,并且无法修复。 @TheodorZoulias 对于第 (2) 点,该方法将在 await 之后在线程池上恢复,因此使用该线程的 PLINQ 不会影响调用者。除非有同步上下文,在这种情况下只需添加 ConfigureAwait(false) 是的,很可能它将在ThreadPool 上恢复,假设Something().ConfigureAwait(false) 不会返回已完成的任务。但这并不是使枚举异步的原因。真正异步意味着至少对IAsyncEnumerator.MoveNextAsync() 的一些调用将返回未完成的任务。如果所有这些调用都阻塞了当前线程并返回已完成的任务,那么您实际上有一个伪装成IAsyncEnumerableIEnumerable。返回IEnumerable 并在ThreadPool 上枚举它会更诚实。 @TheodorZoulias 也许是的。但是如果调用Something() 和迭代ListOfWorkItems 是相互依赖的,那么将它们放在一个方法中可能会产生更易于维护的代码。等待Something() 释放线程(假设它实际上是异步的),然后迭代必须以阻塞方式发生,而不管它是从哪里调用的。 你说得有道理。在这种情况下,返回 IAsyncEnumerable 有一些优点。不过,这不是该技术的预期用途。返回IAsyncEnumerable 是一个合约,并创建调用者不会被阻塞的期望。此答案的实现仅在await Something() 的持续时间内是异步的,并且枚举的其余部分是阻塞的,因此它违反了合同。我的论点是 PLINQ 可能不是解决这个问题的正确工具。

以上是关于在 C#8 IAsyncEnumerable<T> 中并行化收益返回的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 C#8 IAsyncEnumerable<T> 来异步枚举并行运行的任务

C# 8.0 是不是支持 IAsyncEnumerable?

如何等待来自 IAsyncEnumerable<> 的所有结果?

使用 IAsyncEnumerable 读取文本文件

迭代时如何突破 IAsyncEnumerable?

将 IAsyncEnumerable 与 Dapper 一起使用