如何安全地迭代 IAsyncEnumerable 以向下游发送集合以批量处理消息

Posted

技术标签:

【中文标题】如何安全地迭代 IAsyncEnumerable 以向下游发送集合以批量处理消息【英文标题】:How to safely iterate over an IAsyncEnumerable to send a collection downstream for message processing in batches 【发布时间】:2020-11-18 20:28:40 【问题描述】:

我在LINQ with IAsyncEnumerable 上观看了聊天,这让我对处理 IAsyncEnumerables 的扩展方法有了一些了解,但坦率地说,对于现实世界的应用程序来说不够详细,尤其是就我的经验水平而言,我理解IAsyncEnumerables 的样本/文档目前还不存在

我正在尝试从文件中读取,对流进行一些转换,返回 IAsyncEnumerable,然后在获得任意数量的对象后将这些对象发送到下游,例如:

await foreach (var data in ProcessBlob(downloadedFile))

    //todo add data to List<T> called listWithPreConfiguredNumberOfElements
    if (listWithPreConfiguredNumberOfElements.Count == preConfiguredNumber)
        await _messageHandler.Handle(listWithPreConfiguredNumberOfElements);
        
    //repeat the behaviour till all the elements in the IAsyncEnumerable returned by ProcessBlob are sent downstream to the _messageHandler.

到目前为止,我对此事的理解是await foreach 行正在处理使用Tasks(或ValueTasks)的数据,所以我们没有预先计算.我也对使用 List 变量犹豫不决,只是对其进行长度检查,因为跨线程共享该数据似乎不是线程安全的。

我正在使用System.Linq.Async 包,希望我可以使用相关的扩展方法。我可以看到TakeWhile 形式的一些承诺,但我对我打算执行的任务的线程安全性的理解并不完全,这让我失去了信心。

我们将不胜感激任何帮助或推动正确方向,谢谢。

【问题讨论】:

System.Linq.Async 是响应式扩展的一部分 我的第一个想法是 TPL DataFlow 与 BatchBlock... 样本和文档在那里。人们认为 IAsyncEnumerable 比实际情况要多。它“只是”一种异步枚举的方式,而不是一种构建管道的新方式,也不是一种新的多线程方式。它既不是数据流块也不是通道。它可以是管道中步骤之间的粘合剂 是否可以使用 List 或需要 ConcurrentQueue 取决于处理代码的工作方式,而不是源 (IAsyncEnumerable),就像它不依赖于 IEnumerable&lt;T&gt; .如果您有多个来自源的任务,则需要ConcurrentQueue。如果您只有一个任务,您可以使用List,尽管这会阻止您使用多个任务。一个批处理操作虽然不需要多个任务 我怀疑你应该先清理你的代码,把它转换成一个可以轻松创建管道的形式。现场级处理程序使事情变得更加困难。使用 LINQ 风格的方法要容易得多 - 接受 IAsyncEnumerable 作为参数并 return 另一个的方法。您可以一个接一个地链接多个方法来创建管道,始终知道每个方法的作用、它如何处理并发等。方法 IAsyncEnumerable&lt;IList&lt;T&gt;&gt; Batch(this IAsyncEnumerable source, int batchSize)` 允许 @ 987654337@ 【参考方案1】:

在包System.Interactive.Async 中有一个运算符Buffer 可以执行您想要的操作。

// Projects each element of an async-enumerable sequence into consecutive
// non-overlapping buffers which are produced based on element count information.
public static IAsyncEnumerable<IList<TSource>> Buffer<TSource>(
    this IAsyncEnumerable<TSource> source, int count);

此包包含像 AmbThrowCatchDeferFinally 等在 Linq 中没有直接等价物的运算符,但它们在 System.Reactive 中有等价物。这是因为IAsyncEnumerables 在概念上更接近IObservables 而不是IEnumerables(因为两者都有时间维度,而IEnumerables 是永恒的)。

【讨论】:

【参考方案2】:

我也对使用 List 变量犹豫不决,只是对其进行长度检查,因为跨线程共享该数据似乎不是线程安全的。

在处理async 时,您需要考虑执行流程,而不是线程;由于您是await-ing 处理步骤,因此访问列表实际上不存在并发问题,因为无论使用哪个线程:一次只能访问一次列表。

如果您仍然担心,您可以new 每批次列出一个列表,但这可能是矫枉过正。然而,您需要的是两个添加 - 批次之间的重置和最终处理步骤:

var listWithPreConfiguredNumberOfElements = new List<YourType>(preConfiguredNumber);
await foreach (var data in ProcessBlob(downloadedFile)) // CAF?

    listWithPreConfiguredNumberOfElements.Add(data);
    if (listWithPreConfiguredNumberOfElements.Count == preConfiguredNumber)
    
        await _messageHandler.Handle(listWithPreConfiguredNumberOfElements); // CAF?
        listWithPreConfiguredNumberOfElements.Clear(); // reset for a new batch
        // (replace this with a "new" if you're still concerned about concurrency)
    

if (listWithPreConfiguredNumberOfElements.Any())
   // process any stragglers
    await _messageHandler.Handle(listWithPreConfiguredNumberOfElements); // CAF?

您可能选择在标记为// CAF?的三个位置中使用ConfigureAwait(false)

【讨论】:

正是思想上的转变为我解决了这个问题——有点像范式转变并引起了混乱。我赞成这一点,因为解决方案很明显,但它背后的解释真的让我觉得超出了界限,谢谢

以上是关于如何安全地迭代 IAsyncEnumerable 以向下游发送集合以批量处理消息的主要内容,如果未能解决你的问题,请参考以下文章

如何在实际迭代发生之前验证 IAsyncEnumerable 返回方法的参数?

在返回带有取消的 IAsyncEnumerable 的函数中迭代 IAsyncEnumerable

如何强制 IAsyncEnumerable 尊重 CancellationToken

C#8.0: 在 LINQ 中支持异步的 IAsyncEnumerable

如何安全地迭代互锁的 slist?

如何使用 SqlDataReader 返回和使用 IAsyncEnumerable