如何使用 C#8 IAsyncEnumerable<T> 来异步枚举并行运行的任务

Posted

技术标签:

【中文标题】如何使用 C#8 IAsyncEnumerable<T> 来异步枚举并行运行的任务【英文标题】:How to use C#8 IAsyncEnumerable<T> to async-enumerate tasks run in parallel 【发布时间】:2019-10-24 09:02:43 【问题描述】:

如果可能,我想为并行启动的任务创建一个异步枚举器。所以第一个完成的是枚举的第一个元素,第二个完成的是枚举的第二个元素,等等。

public static async IAsyncEnumerable<T> ParallelEnumerateAsync(this IEnumerable<Task<T>> coldAsyncTasks)

    // ... 

我敢打赌,有一种方法可以使用 ContinueWithQueue&lt;T&gt;,但我并不完全相信自己会实现它。

【问题讨论】:

如果您提供有关您的用例的一些细节,这将是一个更好的问题。我不清楚为什么要这样做。 如果你想要 parallel async 有序任务 使用 TPL 数据流 ActionBlock,或 reactive扩展我相信也可以工作 相关:How to implement an efficient WhenEach that streams an IAsyncEnumerable of task results? 【参考方案1】:

这是你要找的吗?

public static async IAsyncEnumerable<T> ParallelEnumerateAsync<T>(
    this IEnumerable<Task<T>> tasks)

    var remaining = new List<Task<T>>(tasks);

    while (remaining.Count != 0)
    
        var task = await Task.WhenAny(remaining);
        remaining.Remove(task);
        yield return (await task);
    

【讨论】:

这看起来更符合 OP 的要求,再加上一个 我想可以用ISet&lt;Task&lt;T&gt;&gt;优化 可能。测量它。 如果有大量任务(超过1000个),这个解决方案将不再有效。在这种情况下,另一种更有效的按完成排序任务的方法是使用 Stephen Cleary 的 Nito.AsyncEx 库中的 OrderByCompletion 扩展方法。【参考方案2】:

如果我理解你的问题,你的重点是启动所有任务,让它们都并行运行,但确保返回值的处理顺序与启动任务的顺序相同。

查看规范,C# 8.0 Asynchronous Streams 任务排队等待并行执行但顺序返回可以查看像这样。

/// Demonstrates Parallel Execution - Sequential Results with test tasks
async Task RunAsyncStreams()

    await foreach (var n in RunAndPreserveOrderAsync(GenerateTasks(6)))
    
        Console.WriteLine($"#n is returned");
    


/// Returns an enumerator that will produce a number of test tasks running
/// for a random time.
IEnumerable<Task<int>> GenerateTasks(int count)

    return Enumerable.Range(1, count).Select(async n =>
    
        await Task.Delay(new Random().Next(100, 1000));
        Console.WriteLine($"#n is complete");
        return n;
    );


/// Launches all tasks in order of enumeration, then waits for the results
/// in the same order: Parallel Execution - Sequential Results.
async IAsyncEnumerable<T> RunAndPreserveOrderAsync<T>(IEnumerable<Task<T>> tasks)

    var queue = new Queue<Task<T>>(tasks);
    while (queue.Count > 0) yield return await queue.Dequeue();

可能的输出:

#5 is complete
#1 is complete
#1 is returned
#3 is complete
#6 is complete
#2 is complete
#2 is returned
#3 is returned
#4 is complete
#4 is returned
#5 is returned
#6 is returned

实际上,这种模式似乎没有任何新的语言级别支持,而且由于异步流处理IAsyncEnumerable&lt;T&gt;,这意味着基础Task 在这里不起作用,并且所有工作人员async 方法都应该具有相同的Task&lt;T&gt; 返回类型,这在一定程度上限制了基于异步流的设计。

因此并根据您的情况(您希望能够取消长时间运行的任务吗?是否需要按任务处理异常?是否应该限制并发任务的数量?)这可能是有道理的查看@TheGeneral 的建议。

更新:

请注意,RunAndPreserveOrderAsync&lt;T&gt; 不一定必须使用 Queue 的任务 - 选择这只是为了更好地显示编码意图。

var queue = new Queue<Task<T>>(tasks);
while (queue.Count > 0) yield return await queue.Dequeue();

将枚举数转换为List 会产生相同的结果; RunAndPreserveOrderAsync&lt;T&gt; 的正文可以在这里替换为一行

foreach(var task in tasks.ToList()) yield return await task;

在此实现中,重要的是首先生成和启动所有任务,这与Queue 初始化或tasks 可枚举到List 的转换一起完成。但是,可能很难拒绝像这样简化上面的foreach

foreach(var task in tasks) yield return await task;

这将导致任务按顺序执行而不是并行运行。

【讨论】:

【参考方案3】:

我对这项任务的看法。从本主题的其他答案中大量借鉴,但(希望)有一些增强。所以想法是启动任务并将它们放入队列中,与其他答案相同,但像 Theodor Zoulias 一样,我也在尝试限制最大并行度。但是,我试图克服他在评论中提到的限制,方法是在之前的任何任务完成后立即使用任务延续来排队下一个任务。通过这种方式,我们当然可以在配置的限制范围内最大限度地同时运行任务的数量。

我不是异步专家,此解决方案可能存在多线程死锁和其他 Heisenbug,我没有测试异常处理等,因此已警告您。

public static async IAsyncEnumerable<TResult> ExecuteParallelAsync<TResult>(IEnumerable<Task<TResult>> coldTasks, int degreeOfParallelism)

    if (degreeOfParallelism < 1)
        throw new ArgumentOutOfRangeException(nameof(degreeOfParallelism));

    if (coldTasks is ICollection<Task<TResult>>) throw new ArgumentException(
        "The enumerable should not be materialized.", nameof(coldTasks));

    var queue = new ConcurrentQueue<Task<TResult>>();

    using var enumerator = coldTasks.GetEnumerator();
    
    for (var index = 0; index < degreeOfParallelism && EnqueueNextTask(); index++) ;

    while (queue.TryDequeue(out var nextTask)) yield return await nextTask;

    bool EnqueueNextTask()
    
        lock (enumerator)
        
            if (!enumerator.MoveNext()) return false;

            var nextTask = enumerator.Current
                .ContinueWith(t =>
                
                    EnqueueNextTask();
                    return t.Result;
                );
            queue.Enqueue(nextTask);
            return true;
        
    

我们使用这种方法来生成测试任务(借用 DK 的回答):

IEnumerable<Task<int>> GenerateTasks(int count)

    return Enumerable.Range(1, count).Select(async n =>
    
        Console.WriteLine($"#n started");
        await Task.Delay(new Random().Next(100, 1000));
        Console.WriteLine($"#n completed");
        return n;
    );

还有他(或她)的测试运行者:

async void Main()

    await foreach (var n in ExecuteParallelAsync(GenerateTasks(9),3))
    
        Console.WriteLine($"#n returned");
    

我们在 LinqPad 中得到了这个结果(这太棒了,顺便说一句)

#1 started
#2 started
#3 started
#3 is complete
#4 started
#2 is complete
#5 started
#1 is complete
#6 started
#1 is returned
#2 is returned
#3 is returned
#4 is complete
#7 started
#4 is returned
#6 is complete
#8 started
#7 is complete
#9 started
#8 is complete
#5 is complete
#5 is returned
#6 is returned
#7 is returned
#8 is returned
#9 is complete
#9 is returned

注意下一个任务是如何在之前的任务完成后立即开始的,以及它们返回的顺序是如何保留的。

【讨论】:

@Theodor Zoulias 谢谢。但是 EqueueNextTask 中的所有内容都发生在锁中,所以基本上只有一个线程可以在任何给定时刻对迭代器执行任何操作?任何完成的任务仍然需要等待锁定才能访问它? 你说得对,我是瞎子。我正在删除我的评论。 ? 赞成。您的解决方案肯定是我的改进。我能想到的唯一缺点是,如果出现异常,它将被包裹在AggregateException 中,因为访问了Task.Result 属性。 谢谢 :) 是的,关于所有这些异步的东西,我还有很多东西要学。虽然它似乎仍然比常规线程更容易。【参考方案4】:

如果您想获取异步流 (IAsyncEnumerable) 并并行运行 Select,那么第一个完成的就是第一个出来的:

/// <summary>
/// Runs the selectors in parallel and yields in completion order
/// </summary>
public static async IAsyncEnumerable<TOut> SelectParallel<TIn, TOut>(
    this IAsyncEnumerable<TIn> source,
    Func<TIn, Task<TOut>> selector)

    if (source == null)
    
        throw new InvalidOperationException("Source is null");
    

    var enumerator = source.GetAsyncEnumerator();

    var sourceFinished = false;
    var tasks = new HashSet<Task<TOut>>();

    Task<bool> sourceMoveTask = null;
    Task<Task<TOut>> pipeCompletionTask = null;

    try
    
        while (!sourceFinished || tasks.Any())
        
            if (sourceMoveTask == null && !sourceFinished)
            
                sourceMoveTask = enumerator.MoveNextAsync().AsTask();
            

            if (pipeCompletionTask == null && tasks.Any())
            
                pipeCompletionTask = Task.WhenAny<TOut>(tasks);
            

            var coreTasks = new Task[]  pipeCompletionTask, sourceMoveTask 
                .Where(t => t != null)
                .ToList();

            if (!coreTasks.Any())
            
                break;
            

            await Task.WhenAny(coreTasks);

            if (sourceMoveTask != null && sourceMoveTask.IsCompleted)
            
                sourceFinished = !sourceMoveTask.Result;

                if (!sourceFinished)
                
                    try
                    
                        tasks.Add(selector(enumerator.Current));
                    
                    catch  
                

                sourceMoveTask = null;
            
            
            if (pipeCompletionTask != null && pipeCompletionTask.IsCompleted)
            
                var completedTask = pipeCompletionTask.Result;

                if (completedTask.IsCompletedSuccessfully)
                
                    yield return completedTask.Result;
                

                tasks.Remove(completedTask);
                pipeCompletionTask = null;
            
        
    
    finally
    
        await enumerator.DisposeAsync();
    

可以像下面这样使用:

    static async Task Main(string[] args)
    
        var source = GetIds();
        var strs = source.SelectParallel(Map);

        await foreach (var str in strs)
        
            Console.WriteLine(str);
        
    

    static async IAsyncEnumerable<int> GetIds()
    
        foreach (var i in Enumerable.Range(1, 20))
        
            await Task.Delay(200);
            yield return i;
        
    

    static async Task<string> Map(int id)
    
        await Task.Delay(rnd.Next(1000, 2000));
        return $"id_Thread.CurrentThread.ManagedThreadId";
    

可能的输出:

[6:31:03 PM] 1_5
[6:31:03 PM] 2_6
[6:31:04 PM] 3_6
[6:31:04 PM] 6_4
[6:31:04 PM] 5_4
[6:31:04 PM] 4_5
[6:31:05 PM] 8_6
[6:31:05 PM] 7_6
[6:31:05 PM] 11_6
[6:31:05 PM] 10_4
[6:31:05 PM] 9_6
[6:31:06 PM] 14_6
[6:31:06 PM] 12_4
[6:31:06 PM] 13_4
[6:31:06 PM] 15_4
[6:31:07 PM] 17_4
[6:31:07 PM] 20_4
[6:31:07 PM] 16_6
[6:31:07 PM] 18_6
[6:31:08 PM] 19_6

【讨论】:

以上是关于如何使用 C#8 IAsyncEnumerable<T> 来异步枚举并行运行的任务的主要内容,如果未能解决你的问题,请参考以下文章

C# 8.0 是不是支持 IAsyncEnumerable?

如何等待来自 IAsyncEnumerable<> 的所有结果?

在 C#8 IAsyncEnumerable<T> 中并行化收益返回

迭代时如何突破 IAsyncEnumerable?

如何实现一个高效的 WhenEach 流式传输任务结果的 IAsyncEnumerable?

使用 IAsyncEnumerable 读取文本文件