SemaphoreSlim 不会限制任务

Posted

技术标签:

【中文标题】SemaphoreSlim 不会限制任务【英文标题】:SemaphoreSlim is not throttling the tasks 【发布时间】:2020-01-30 05:52:38 【问题描述】:

我创建了以下方法 TestThrottled 来尝试限制我的任务,但是当我调用 WhenAll 并且此方法都具有相同的经过时间时,它根本没有限制。我做错什么了吗?

    private static async Task<T[]> TestThrottled<T>(List<Task<T>> tasks, int maxDegreeOfParallelism)
    
        var semaphore = new SemaphoreSlim(maxDegreeOfParallelism);
        var tasksParallelized = new List<Task<T>>();

        foreach (var task in tasks)
        
            var taskParallelized = Task.Run(async () =>
            
                try
                
                    await semaphore.WaitAsync();

                    return await task;
                
                finally
                
                    semaphore.Release();
                
            );
            tasksParallelized.Add(taskParallelized);
        

        return await Task.WhenAll(tasksParallelized);
    

    private static async Task<int> TestAsync()
    
        await Task.Delay(1000);

        return 1;
    

    static async Task Main(string[] args)
    
        var sw = Stopwatch.StartNew();

        var tasks = new List<Task<int>>();
        var ints = new List<int>();

        for (int i = 0; i < 30; i++)
        
            tasks.Add(TestAsync());
        
        ints.AddRange(await TestThrottled(tasks, 1));

        Console.WriteLine($"sw.ElapsedMilliseconds, count: ints.Count");
        Console.ReadLine();
    

【问题讨论】:

我不明白这个问题 我想限制并发任务,所以我根据该示例代码创建了该 PoC gist.github.com/kadukf/… 【参考方案1】:

您可以使用TPL DataFlow 实现此目的的另一种方式,它已经拥有您需要的一切,并且可以在需要时满足更复杂的流水线,并且具有更多的可配置性。它还可以节省您在示例解决方案中卸载到另一个任务的时间

private static async Task<IList<T>> TestThrottled<T>(IEnumerable<Func<Task<T>>> tasks, int maxDegreeOfParallelism)

   var options = new ExecutionDataflowBlockOptions()  EnsureOrdered = false, MaxDegreeOfParallelism = maxDegreeOfParallelism ;

   var transform = new TransformBlock<Func<Task<T>>, T>(func => func.Invoke(), options);
   var outputBufferBlock = new BufferBlock<T>();

   transform.LinkTo(outputBufferBlock, new DataflowLinkOptions()PropagateCompletion = true);

   foreach (var task in tasks)
      transform.Post(task);

   transform.Complete();
   await outputBufferBlock. Completion;

   outputBufferBlock.TryReceiveAll(out var result);

   return result;

【讨论】:

这是一个不错的解决方案。我宁愿等待BufferBlock 而不是TransformBlock 的完成,以避免可能的竞争条件。我还将tasks 参数重命名为taskFactories @TheodorZoulias 哎呀,你确实是对的。可能也可以在 RX 中更流畅地完成【参考方案2】:

这里的主要问题是async/await 的行为。当你打电话时会发生什么

private static async Task<int> TestAsync()
 
    await Task.Delay(1000);
    return 1;


TestAync();

TestAsync() 被调用。在该方法中,Task.Delay() 被调用。这将创建一个在 1000 毫秒后完成的任务。最后,您返回该任务(实际上,另一个任务被安排为Task.Delay() 返回的任务的延续)。

您在Main() 的循环中几乎同时创建所有这些任务。因此,尽管您可能有一个信号量来防止多个线程同时调用await task,但它们都被安排在大约同一时间完成。 await 仅在任务尚未完成时等待。因此,一旦第一个线程释放信号量(大约一秒钟后),下一个线程就可以进入临界区,在那里它会发现任务已经完成(或非常接近完成)。然后它可以立即释放信号量。其余任务也会发生这种情况,总运行时间约为一秒。

【讨论】:

我不确定我是否理解你,所以没有办法使用 semaphoreSlim 来限制,我想同时运行 maxDegreeOfParallelism,这个 PoC 的预期行为会是 30 秒 问题是任务在有任何信号量之前就开始了。您需要限制 start,而不是 waiting【参考方案3】:

我解决了我的问题(创建一个接收异步方法列表的通用限制任务运行程序),执行如下操作:

    private static async Task<T[]> RunAsyncThrottled<T>(IEnumerable<Func<Task<T>>> tasks, int maxDegreeOfParallelism)
    
        var tasksParallelized = new List<Task<T>>();

        using (var semaphore = new SemaphoreSlim(maxDegreeOfParallelism))
        
            foreach (var task in tasks)
            
                var taskParallelized = Task.Run(async () =>
                
                    await semaphore.WaitAsync();
                    try
                    
                        return await task.Invoke();
                    
                    finally
                    
                        semaphore.Release();
                    
                );
                tasksParallelized.Add(taskParallelized);
            

            return await Task.WhenAll(tasksParallelized);
        
    

    private static async Task<int> TestAsync(int num)
    
        await Task.Delay(1000);

        return 1 + num;
    

    static async Task Main(string[] args)
    
        var sw = Stopwatch.StartNew();

        var tasks = new List<Func<Task<int>>>();
        var ints = new List<int>();

        for (int i = 0; i < 10; i++)
        
            tasks.Add(() => TestAsync(12000));
        

        ints.AddRange(await RunAsyncThrottled(tasks, 1000));

        Console.WriteLine($"sw.Elapsed.TotalMilliseconds, count: ints.Count");
        Console.ReadLine();
    

【讨论】:

【参考方案4】:

解决这个问题的关键是让限制器启动任务,而不是预先启动它们。并且由于使用旧的Task.Start 方法显式启动任务非常受限制(早于并且不能利用异步等待机制),唯一的选择是让节流器创建任务。有多种方法可以做到这一点:

    传递任务工厂而不是任务。此方法已在其他答案中进行了演示。
private static async Task<TResult[]> RunAsyncThrottled<TResult>(
    IEnumerable<Func<Task<TResult>>> taskFactories,
    int maxDegreeOfParallelism)

    //...
    foreach (var taskFactory in taskFactories)
        //...
        var task = taskFactory();
        TResult result = await task;

    传递一系列项目和一个接受项目作为参数的任务工厂。这是最常用的方法:
private static async Task<TResult[]> RunAsyncThrottled<TSource, TResult>(
    IEnumerable<TSource> items, Func<TSource, Task<TResult>> taskFactory,
    int maxDegreeOfParallelism)

    //...
    foreach (var item in items)
        //...
        var task = taskFactory(item);
        TResult result = await task;

    传递延迟的可枚举任务。可以使用 LINQ 或迭代器(yield 的方法)创建这样的枚举。
private static async Task<TResult[]> RunAsyncThrottled<TResult>(
    IEnumerable<Task<TResult>> tasks, int maxDegreeOfParallelism)

    if (tasks is ICollection<Task<TResult>>) throw new ArgumentException(
        "The enumerable should not be materialized.", nameof(tasks));
    //...
    foreach (var task in tasks)
        //...
        TResult result = await task;

由于C# 8现在已经发布,方法的返回值有一个替代方案。它可以返回而不是返回Task&lt;TResult[]&gt; IAsyncEnumerable&lt;TResult&gt;,允许使用 await foreach 进行异步枚举。

private static async IAsyncEnumerable<TResult> RunAsyncThrottled<TSource, TResult>(
    IEnumerable<TSource> items, Func<TSource, Task<TResult>> taskFactory,
    int maxDegreeOfParallelism)

    //...
    foreach (var item in items)
        //...
        yield return await taskFactory(item);

【讨论】:

以上是关于SemaphoreSlim 不会限制任务的主要内容,如果未能解决你的问题,请参考以下文章

c# Task.WhenAll(tasks) 和 SemaphoreSlim - 如何知道所有任务何时已完全完成

具有动态 maxCount 的 SemaphoreSlim

在异步任务中并行使用 WebView2

多线程之信号量——SemaphoreSlim

多线程10-SemaphoreSlim

增加/减少 SemaphoreSlim 中可用插槽的数量