如何安全地迭代 IAsyncEnumerable 以向下游发送集合以批量处理消息
Posted
技术标签:
【中文标题】如何安全地迭代 IAsyncEnumerable 以向下游发送集合以批量处理消息【英文标题】:How to safely iterate over an IAsyncEnumerable to send a collection downstream for message processing in batches 【发布时间】:2020-11-18 20:28:40 【问题描述】:我在LINQ with IAsyncEnumerable 上观看了聊天,这让我对处理 IAsyncEnumerables 的扩展方法有了一些了解,但坦率地说,对于现实世界的应用程序来说不够详细,尤其是就我的经验水平而言,我理解IAsyncEnumerable
s 的样本/文档目前还不存在
我正在尝试从文件中读取,对流进行一些转换,返回 IAsyncEnumerable
,然后在获得任意数量的对象后将这些对象发送到下游,例如:
await foreach (var data in ProcessBlob(downloadedFile))
//todo add data to List<T> called listWithPreConfiguredNumberOfElements
if (listWithPreConfiguredNumberOfElements.Count == preConfiguredNumber)
await _messageHandler.Handle(listWithPreConfiguredNumberOfElements);
//repeat the behaviour till all the elements in the IAsyncEnumerable returned by ProcessBlob are sent downstream to the _messageHandler.
到目前为止,我对此事的理解是await foreach
行正在处理使用Task
s(或ValueTask
s)的数据,所以我们没有预先计算.我也对使用 List 变量犹豫不决,只是对其进行长度检查,因为跨线程共享该数据似乎不是线程安全的。
我正在使用System.Linq.Async
包,希望我可以使用相关的扩展方法。我可以看到TakeWhile
形式的一些承诺,但我对我打算执行的任务的线程安全性的理解并不完全,这让我失去了信心。
我们将不胜感激任何帮助或推动正确方向,谢谢。
【问题讨论】:
System.Linq.Async
是响应式扩展的一部分
我的第一个想法是 TPL DataFlow 与 BatchBlock...
样本和文档在那里。人们认为 IAsyncEnumerable 比实际情况要多。它“只是”一种异步枚举的方式,而不是一种构建管道的新方式,也不是一种新的多线程方式。它既不是数据流块也不是通道。它可以是管道中步骤之间的粘合剂
是否可以使用 ListIEnumerable<T>
.如果您有多个来自源的任务,则需要ConcurrentQueue
。如果您只有一个任务,您可以使用List
,尽管这会阻止您使用多个任务。一个批处理操作虽然不需要多个任务IAsyncEnumerable
作为参数并 return 另一个的方法。您可以一个接一个地链接多个方法来创建管道,始终知道每个方法的作用、它如何处理并发等。方法 IAsyncEnumerable<IList<T>>
Batch在包System.Interactive.Async 中有一个运算符Buffer
可以执行您想要的操作。
// Projects each element of an async-enumerable sequence into consecutive
// non-overlapping buffers which are produced based on element count information.
public static IAsyncEnumerable<IList<TSource>> Buffer<TSource>(
this IAsyncEnumerable<TSource> source, int count);
此包包含像 Amb
、Throw
、Catch
、Defer
、Finally
等在 Linq 中没有直接等价物的运算符,但它们在 System.Reactive 中有等价物。这是因为IAsyncEnumerable
s 在概念上更接近IObservable
s 而不是IEnumerable
s(因为两者都有时间维度,而IEnumerable
s 是永恒的)。
【讨论】:
【参考方案2】:我也对使用 List 变量犹豫不决,只是对其进行长度检查,因为跨线程共享该数据似乎不是线程安全的。
在处理async
时,您需要考虑执行流程,而不是线程;由于您是await
-ing 处理步骤,因此访问列表实际上不存在并发问题,因为无论使用哪个线程:一次只能访问一次列表。
如果您仍然担心,您可以new
每批次列出一个列表,但这可能是矫枉过正。然而,您做需要的是两个添加 - 批次之间的重置和最终处理步骤:
var listWithPreConfiguredNumberOfElements = new List<YourType>(preConfiguredNumber);
await foreach (var data in ProcessBlob(downloadedFile)) // CAF?
listWithPreConfiguredNumberOfElements.Add(data);
if (listWithPreConfiguredNumberOfElements.Count == preConfiguredNumber)
await _messageHandler.Handle(listWithPreConfiguredNumberOfElements); // CAF?
listWithPreConfiguredNumberOfElements.Clear(); // reset for a new batch
// (replace this with a "new" if you're still concerned about concurrency)
if (listWithPreConfiguredNumberOfElements.Any())
// process any stragglers
await _messageHandler.Handle(listWithPreConfiguredNumberOfElements); // CAF?
您可能也选择在标记为// CAF?
的三个位置中使用ConfigureAwait(false)
【讨论】:
正是思想上的转变为我解决了这个问题——有点像范式转变并引起了混乱。我赞成这一点,因为解决方案很明显,但它背后的解释真的让我觉得超出了界限,谢谢以上是关于如何安全地迭代 IAsyncEnumerable 以向下游发送集合以批量处理消息的主要内容,如果未能解决你的问题,请参考以下文章
如何在实际迭代发生之前验证 IAsyncEnumerable 返回方法的参数?
在返回带有取消的 IAsyncEnumerable 的函数中迭代 IAsyncEnumerable
如何强制 IAsyncEnumerable 尊重 CancellationToken