如何等待来自 IAsyncEnumerable<> 的所有结果?
Posted
技术标签:
【中文标题】如何等待来自 IAsyncEnumerable<> 的所有结果?【英文标题】:How to await all results from an IAsyncEnumerable<>? 【发布时间】:2020-03-13 20:34:23 【问题描述】:我正在修改 C# 8.0 中的新 IAsyncEnumerable<T>
内容。假设我在某处想使用某种方法:
public IAsyncEnumerable<T> SomeBlackBoxFunctionAsync<T>(...) ...
我知道我可以将它与await foreach...
语法一起使用。但是,假设我的消费者需要从该函数获得所有结果,然后才能继续。在继续之前等待所有结果的最佳语法是什么?换句话说,我希望能够执行以下操作:
// but that extension - AllResultsAsync() - doesn't exist :-/
List<T> myList = await SomeBlackBoxFunctionAsync<T>().AllResultsAsync();
这样做的正确方法是什么?
【问题讨论】:
Task.WaitAll() ?? @AzharKhorasany 该语法是什么样的?我已经修改了Task.WhenAll()
,但我无法让它工作。
await foreach (var item in SomeBlackBoxFunctionAsync<T>()) myList.Add(item);
从您的方法中返回任务,然后等待所有。
为什么要在处理结果之前消耗整个流?根据定义,异步流可能永远不会结束
【参考方案1】:
首先警告:根据定义,异步流可能永远不会结束并一直产生结果,直到应用程序终止。这已经在例如 SignalR 或 gRPC 中使用。轮询循环也以这种方式工作。
在异步流上使用 ToListAsync
可能会产生意想不到的后果。
System.Linq.Async 包已经提供了这样的运算符。
通过ToListAsync 可以使用整个流。代码*看似简单,但隐藏了一些有趣的问题:
public static ValueTask<List<TSource>> ToListAsync<TSource>(this IAsyncEnumerable<TSource> source, CancellationToken cancellationToken = default)
if (source == null)
throw Error.ArgumentNull(nameof(source));
if (source is IAsyncIListProvider<TSource> listProvider)
return listProvider.ToListAsync(cancellationToken);
return Core(source, cancellationToken);
static async ValueTask<List<TSource>> Core(IAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
var list = new List<TSource>();
await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
list.Add(item);
return list;
首先,它返回一个ValueTask
。其次,它确保观察到取消并使用ConfigureAwait(false)
,以防止死锁。最后,如果源已经提供了自己的ToListAsync
实现,则运营商会遵照执行。
【讨论】:
注意:IAsyncIListProvider
接口包含在System.Linq.Async
包中(以及其他两个不起眼的接口),并且没有由任何公开可见的类实现。【参考方案2】:
作为一个选项,您可以使用ToArrayAsync
扩展方法,在System.Linq.Async
包中定义
public static ValueTask<TSource[]> ToArrayAsync<TSource>(this IAsyncEnumerable<TSource> source, CancellationToken cancellationToken = default)
根据定义,它扩展了IAsyncEnumerable
接口
【讨论】:
【参考方案3】:根据@DmitryBychenko 的评论,我写了一个扩展来做我想要的:
public static async Task<ICollection<T>> AllResultsAsync<T>(this IAsyncEnumerable<T> asyncEnumerable)
if (null == asyncEnumerable)
throw new ArgumentNullException(nameof(asyncEnumerable));
var list = new List<T>();
await foreach (var t in asyncEnumerable)
list.Add(t);
return list;
我只是有点惊讶这不是 C# 8.0 原生提供的......这似乎是一个非常明显的需求。
【讨论】:
看看这个 GitHub thread,基本上有一个基于社区的包和repo,带有对 AsyncEnumerable 的 linq 支持。或者使用 Rx 提供的System.Linq.Async
。
请为asyncEnumerable
添加验证(因为AllResultsAsync
是public
方法) - 它不能是null
并且有我的+1跨度>
它已作为 System.Linq.Async 包的一部分提供。
@PanagiotisKanavos 很高兴听到它! System.Linq.Async 中调用的方法是什么?如果它在那里,那应该是我问题的正确答案。以上是关于如何等待来自 IAsyncEnumerable<> 的所有结果?的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 SqlDataReader 返回和使用 IAsyncEnumerable
如何强制 IAsyncEnumerable 尊重 CancellationToken