如何等待来自 IAsyncEnumerable<> 的所有结果?

Posted

技术标签:

【中文标题】如何等待来自 IAsyncEnumerable<> 的所有结果?【英文标题】:How to await all results from an IAsyncEnumerable<>? 【发布时间】:2020-03-13 20:34:23 【问题描述】:

我正在修改 C# 8.0 中的新 IAsyncEnumerable&lt;T&gt; 内容。假设我在某处想使用某种方法:

public IAsyncEnumerable<T> SomeBlackBoxFunctionAsync<T>(...)  ... 

我知道我可以将它与await foreach... 语法一起使用。但是,假设我的消费者需要从该函数获得所有结果,然后才能继续。在继续之前等待所有结果的最佳语法是什么?换句话说,我希望能够执行以下操作:

// but that extension - AllResultsAsync() - doesn't exist :-/
List<T> myList = await SomeBlackBoxFunctionAsync<T>().AllResultsAsync(); 

这样做的正确方法是什么?

【问题讨论】:

Task.WaitAll() ?? @AzharKhorasany 该语法是什么样的?我已经修改了Task.WhenAll(),但我无法让它工作。 await foreach (var item in SomeBlackBoxFunctionAsync&lt;T&gt;()) myList.Add(item); 从您的方法中返回任务,然后等待所有。 为什么要在处理结果之前消耗整个流?根据定义,异步流可​​能永远不会结束 【参考方案1】:

首先警告:根据定义,异步流可​​能永远不会结束并一直产生结果,直到应用程序终止。这已经在例如 SignalR 或 gRPC 中使用。轮询循环也以这种方式工作。

在异步流上使用 ToListAsync 可能会产生意想不到的后果。


System.Linq.Async 包已经提供了这样的运算符。

通过ToListAsync 可以使用整个流。代码*看似简单,但隐藏了一些有趣的问题:

public static ValueTask<List<TSource>> ToListAsync<TSource>(this IAsyncEnumerable<TSource> source, CancellationToken cancellationToken = default)

    if (source == null)
        throw Error.ArgumentNull(nameof(source));

    if (source is IAsyncIListProvider<TSource> listProvider)
        return listProvider.ToListAsync(cancellationToken);

    return Core(source, cancellationToken);

    static async ValueTask<List<TSource>> Core(IAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
    
        var list = new List<TSource>();

        await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
        
            list.Add(item);
        

        return list;
    

首先,它返回一个ValueTask。其次,它确保观察到取消并使用ConfigureAwait(false),以防止死锁。最后,如果源已经提供了自己的ToListAsync 实现,则运营商会遵照执行。

【讨论】:

注意:IAsyncIListProvider 接口包含在System.Linq.Async 包中(以及其他两个不起眼的接口),并且没有由任何公开可见的类实现。【参考方案2】:

作为一个选项,您可以使用ToArrayAsync 扩展方法,在System.Linq.Async 包中定义

public static ValueTask<TSource[]> ToArrayAsync<TSource>(this IAsyncEnumerable<TSource> source, CancellationToken cancellationToken = default)

根据定义,它扩展了IAsyncEnumerable 接口

【讨论】:

【参考方案3】:

根据@DmitryBychenko 的评论,我写了一个扩展来做我想要的:

    public static async Task<ICollection<T>> AllResultsAsync<T>(this IAsyncEnumerable<T> asyncEnumerable)
    
        if (null == asyncEnumerable)
            throw new ArgumentNullException(nameof(asyncEnumerable));  

        var list = new List<T>();
        await foreach (var t in asyncEnumerable)
        
            list.Add(t);
        

        return list;
    

我只是有点惊讶这不是 C# 8.0 原生提供的......这似乎是一个非常明显的需求。

【讨论】:

看看这个 GitHub thread,基本上有一个基于社区的包和repo,带有对 AsyncEnumerable 的 linq 支持。或者使用 Rx 提供的System.Linq.Async 请为asyncEnumerable 添加验证(因为AllResultsAsyncpublic 方法) - 它不能是null 并且有我的+1跨度> 作为 System.Linq.Async 包的一部分提供。 @PanagiotisKanavos 很高兴听到它! System.Linq.Async 中调用的方法是什么?如果它在那里,那应该是我问题的正确答案。

以上是关于如何等待来自 IAsyncEnumerable<> 的所有结果?的主要内容,如果未能解决你的问题,请参考以下文章

并行运行异步查询 (IAsyncEnumerable)

如何使用 SqlDataReader 返回和使用 IAsyncEnumerable

如何强制 IAsyncEnumerable 尊重 CancellationToken

如何在存储库类中使用 IAsyncEnumerable

如何实现一个高效的 WhenEach 流式传输任务结果的 IAsyncEnumerable?

如何在实际迭代发生之前验证 IAsyncEnumerable 返回方法的参数?