如何实现一个高效的 WhenEach 流式传输任务结果的 IAsyncEnumerable?

Posted

技术标签:

【中文标题】如何实现一个高效的 WhenEach 流式传输任务结果的 IAsyncEnumerable?【英文标题】:How to implement an efficient WhenEach that streams an IAsyncEnumerable of task results? 【发布时间】:2020-01-31 07:14:49 【问题描述】:

我正在尝试使用C# 8 提供的新工具更新我的工具集,其中一种似乎特别有用的方法是返回IAsyncEnumerableTask.WhenAll 版本。此方法应在任务结果可用时立即对其进行流式传输,因此将其命名为 WhenAll 没有多大意义。 WhenEach 听起来更合适。该方法的签名是:

public static IAsyncEnumerable<TResult> WhenEach<TResult>(Task<TResult>[] tasks);

这个方法可以这样使用:

var tasks = new Task<int>[]

    ProcessAsync(1, 300),
    ProcessAsync(2, 500),
    ProcessAsync(3, 400),
    ProcessAsync(4, 200),
    ProcessAsync(5, 100),
;

await foreach (int result in WhenEach(tasks))

    Console.WriteLine($"Processed: result");


static async Task<int> ProcessAsync(int result, int delay)

    await Task.Delay(delay);
    return result;

预期输出:

已处理:5 已处理:4 已处理:1 已处理:3 已处理:2

我设法在循环中使用方法Task.WhenAny编写了一个基本实现,但是这种方法存在问题:

public static async IAsyncEnumerable<TResult> WhenEach<TResult>(
    Task<TResult>[] tasks)

    var hashSet = new HashSet<Task<TResult>>(tasks);
    while (hashSet.Count > 0)
    
        var task = await Task.WhenAny(hashSet).ConfigureAwait(false);
        yield return await task.ConfigureAwait(false);
        hashSet.Remove(task);
    

问题在于性能。 Task.WhenAny 的 implementation 创建所提供任务列表的防御性副本,因此在循环中重复调用它会导致 O(n²) 计算复杂度。我幼稚的实现很难处理 10,000 个任务。我的机器上的开销将近 10 秒。我希望该方法几乎与内置 Task.WhenAll 一样高效,可以轻松处理数十万个任务。如何改进WhenEach 方法以使其表现得体?

【问题讨论】:

也许这对你有一些用处:devblogs.microsoft.com/pfxteam/… 大约在文章的中途你会看到一个性能版本。 @JohanP 有趣的文章,谢谢!分而治之的技术(在子序列中应用Task.WhenAny)作为可能的解决方案在我脑海中浮现,但它很复杂,可能仍然不是最佳的。 ContinueWith 的另一种技术似乎更有希望,但我很难想象它如何与 IAsyncEnumerable 结合作为返回值。 不幸的是,您将无法在匿名方法中屈服,因此我无法确定 ContinueWith。 @TheGeneral 是的,我想不出用ContinueWith 方法来超越这个限制的方法。 【参考方案1】:

通过使用this文章中的代码,您可以实现以下内容:

public static Task<Task<T>>[] Interleaved<T>(IEnumerable<Task<T>> tasks)

   var inputTasks = tasks.ToList();

   var buckets = new TaskCompletionSource<Task<T>>[inputTasks.Count];
   var results = new Task<Task<T>>[buckets.Length];
   for (int i = 0; i < buckets.Length; i++)
   
       buckets[i] = new TaskCompletionSource<Task<T>>();
       results[i] = buckets[i].Task;
   

   int nextTaskIndex = -1;
   Action<Task<T>> continuation = completed =>
   
       var bucket = buckets[Interlocked.Increment(ref nextTaskIndex)];
       bucket.TrySetResult(completed);
   ;

   foreach (var inputTask in inputTasks)
       inputTask.ContinueWith(continuation, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);

   return results;

然后将您的WhenEach 更改为调用Interleaved 代码

public static async IAsyncEnumerable<TResult> WhenEach<TResult>(Task<TResult>[] tasks)

    foreach (var bucket in Interleaved(tasks))
    
        var t = await bucket;
        yield return await t;
    

然后你可以像往常一样拨打你的WhenEach

await foreach (int result in WhenEach(tasks))

    Console.WriteLine($"Processed: result");

我对 10k 个任务进行了一些基本的基准测试,并在速度方面提高了 5 倍。

【讨论】:

我接受这个答案是因为它非常高效,到处运行,并且不依赖外部包!【参考方案2】:

您可以将 Channel 用作异步队列。每个任务完成后都可以写入通道。频道中的项目将通过ChannelReader.ReadAllAsync 作为 IAsyncEnumerable 返回。

IAsyncEnumerable<T> ToAsyncEnumerable<T>(IEnumerable<Task<T>> inputTasks)

    var channel=Channel.CreateUnbounded<T>();
    var writer=channel.Writer;
    var continuations=inputTasks.Select(t=>t.ContinueWith(x=>
                                           writer.TryWrite(x.Result)));
    _ = Task.WhenAll(continuations)
            .ContinueWith(t=>writer.Complete(t.Exception));

    return channel.Reader.ReadAllAsync();

当所有任务完成时,调用writer.Complete() 关闭频道。

为了测试这一点,此代码生成延迟递减的任务。这应该以相反的顺序返回索引:

var tasks=Enumerable.Range(1,4)
                    .Select(async i=>
                     
                      await Task.Delay(300*(5-i));
                      return i;
                    );

await foreach(var i in Interleave(tasks))

     Console.WriteLine(i);


生产:

4
3
2
1

【讨论】:

感谢 Panagiotis 的出色回答!您的解决方案与 JohanP 的解决方案表现同样出色,并且在内存分配方面更胜一筹。不过,它处理异常的方式不同。您的解决方案将所有异常的传播延迟到流结束,而 JohanP 的解决方案在第一个任务失败时立即抛出。我不确定哪种行为更有用。您的解决方案的缺点是它不能在 .NET Framework 上编译,因为 Reader.ReadAllAsync 方法是 .NET Core 特定的。有没有办法让它对 .NET Framework 友好? @TheodorZoulias 我还想要立即传播异常,所以我添加了一个基于此的解决方案以实现这一点:***.com/a/62204126/1428743。虽然它仍然使用Reader.ReadAllAsync,但如果您仍然需要.NET Framework 支持,我不确定它是否适合您。【参考方案3】:

只是为了好玩,使用System.ReactiveSystem.Interactive.Async

public static async IAsyncEnumerable<TResult> WhenEach<TResult>(
    Task<TResult>[] tasks)
    => Observable.Merge(tasks.Select(t => t.ToObservable())).ToAsyncEnumerable()

【讨论】:

为什么不System.Linq.Async :P ? System.Interactive.Async 使用System.Linq.Async 感谢 Paulo 提供简洁明了的解决方案!不幸的是,它不能很好地扩展。在 20,000 个任务中,我的机器已经有大约 5 秒的开销。相比之下,JohanP 的 solution 在 100,000 个任务中的开销不到半秒。【参考方案4】:

我真的很喜欢 solution provided by Panagiotis,但仍然希望引发异常,就像在 JohanP 的解决方案中一样。

为了实现这一点,我们可以稍微修改一下,在任务失败时尝试关闭通道:

public IAsyncEnumerable<T> ToAsyncEnumerable<T>(IEnumerable<Task<T>> inputTasks)

    if (inputTasks == null)
    
        throw new ArgumentNullException(nameof(inputTasks), "Task list must not be null.");
    

    var channel = Channel.CreateUnbounded<T>();
    var channelWriter = channel.Writer;
    var inputTaskContinuations = inputTasks.Select(inputTask => inputTask.ContinueWith(completedInputTask =>
    
        // Check whether the task succeeded or not
        if (completedInputTask.Status == TaskStatus.RanToCompletion)
        
            // Write the result to the channel on successful completion
            channelWriter.TryWrite(completedInputTask.Result);
        
        else
        
            // Complete the channel on failure to immediately communicate the failure to the caller and prevent additional results from being returned
            var taskException = completedInputTask.Exception?.InnerException ?? completedInputTask?.Exception;
            channelWriter.TryComplete(taskException);
        
    ));

    // Ensure the writer is closed after the tasks are all complete, and propagate any exceptions from the continuations
    _ = Task.WhenAll(inputTaskContinuations).ContinueWith(completedInputTaskContinuationsTask => channelWriter.TryComplete(completedInputTaskContinuationsTask.Exception));

    // Return the async enumerator of the channel so results are yielded to the caller as they're available
    return channel.Reader.ReadAllAsync();

这样做的明显缺点是遇到的第一个错误将结束枚举并阻止返回任何其他可能成功的结果。这是我的用例可以接受的权衡,但可能不适用于其他用例。

【讨论】:

感谢@PseudoPsyche 的回答!我注意到当它遇到取消的任务时它的行为很奇怪。生成的IAsyncEnumerable 在完成第一个取消的任务后立即成功完成。 啊,是的,我现在明白了。通过添加对TaskStatus.Canceled 的检查应该很容易处理,因此在这种情况下它不会关闭通道。没有注意到,因为我的场景没有使用任务取消。【参考方案5】:

我要为这个问题再添加一个答案,因为有几个问题需要解决。

    建议创建异步可枚举序列的方法应具有CancellationToken 参数。这会在await foreach 循环中启用WithCancellation 配置。 建议当异步操作将延续附加到任务时,应在操作完成时清理这些延续。因此,例如,如果WhenEach 方法的调用者决定提前退出await foreach 循环(使用breakreturn 等),或者如果循环由于异常而提前终止,我们不想留下一堆死去的延续,依附于任务。如果在循环中重复调用 WhenEach(例如,作为 Retry 功能的一部分),这一点尤其重要。

下面的实现解决了这两个问题。它基于Channel&lt;Task&lt;TResult&gt;&gt;。现在channels 已成为 .NET 平台的一个组成部分,因此没有理由避免使用它们来支持更复杂的基于TaskCompletionSource 的解决方案。

public async static IAsyncEnumerable<TResult> WhenEach<TResult>(
    Task<TResult>[] tasks,
    [EnumeratorCancellation] CancellationToken cancellationToken = default)

    if (tasks == null) throw new ArgumentNullException(nameof(tasks));
    var channel = Channel.CreateUnbounded<Task<TResult>>();
    using var completionCts = new CancellationTokenSource();
    var continuations = new List<Task>(tasks.Length);
    try
    
        int pendingCount = tasks.Length;
        foreach (var task in tasks)
        
            if (task == null) throw new ArgumentException(
                $"The tasks argument included a null value.", nameof(tasks));
            continuations.Add(task.ContinueWith(t =>
            
                bool accepted = channel.Writer.TryWrite(t);
                Debug.Assert(accepted);
                if (Interlocked.Decrement(ref pendingCount) == 0)
                    channel.Writer.Complete();
            , completionCts.Token, TaskContinuationOptions.ExecuteSynchronously |
                TaskContinuationOptions.DenyChildAttach, TaskScheduler.Default));
        

        await foreach (var task in channel.Reader.ReadAllAsync(cancellationToken)
            .ConfigureAwait(false))
        
            yield return await task.ConfigureAwait(false);
            cancellationToken.ThrowIfCancellationRequested();
        
    
    finally
    
        completionCts.Cancel();
        try  await Task.WhenAll(continuations).ConfigureAwait(false); 
        catch (OperationCanceledException)   // Ignore
    

finally 块负责取消附加的延续,并在退出之前等待它们完成。

await foreach 循环中的 ThrowIfCancellationRequested 可能看起来多余,但实际上它是必需的,因为 ReadAllAsync 方法的设计行为,这在 here 中进行了解释。


注意: finally 块中的 OperationCanceledException 被低效的 try/catch 块抑制。捕获异常is expensive。一种更有效的实现将通过使用专门的 SuppressException 等待器(如 this 答案中的特色)等待继续来抑制错误,并特殊处理 IsCanceled 案例。出于此答案的目的,修复这种低效率可能是矫枉过正。 WhenEach 方法不太可能在紧密循环中使用。

【讨论】:

以上是关于如何实现一个高效的 WhenEach 流式传输任务结果的 IAsyncEnumerable?的主要内容,如果未能解决你的问题,请参考以下文章

在 .NET 中跨进程边界高效地流式传输数据

多任务处理@Raspi?在流式传输音频时运行 python 脚本

Haskell 中字节流的高效流式传输和操作

RTP/RTCP协议介绍

gRPC - Firestore 如何实现服务器-> 客户端实时流式传输

iOS:如何将音频数据从客户端流式传输到服务器?