System.Threading.Tasks - 限制并发任务的数量

Posted

技术标签:

【中文标题】System.Threading.Tasks - 限制并发任务的数量【英文标题】:System.Threading.Tasks - Limit the number of concurrent Tasks 【发布时间】:2011-02-23 07:48:34 【问题描述】:

我刚刚开始研究 .Net 4.0 中新的“System.Threading.Tasks”优点,想知道是否有任何构建支持限制同时运行的并发任务数量,或者如果这应该手动处理。

E.G: 如果我需要调用一个计算方法 100 次,有没有办法设置 100 个 Tasks,但同时只执行 5 个?答案可能只是创建 5 个任务,调用 Task.WaitAny,并在前一个任务完成时创建一个新任务。如果有更好的方法,我只是想确保我不会错过任何技巧。

基本上,有没有内置的方法可以做到这一点:

Dim taskArray() = New Task(Function() DoComputation1()),
                   New Task(Function() DoComputation2()),
                   ...
                   New Task(Function() DoComputation100())

Dim maxConcurrentThreads As Integer = 5
RunAllTasks(taskArray, maxConcurrentThreads)

感谢您的帮助。

【问题讨论】:

您能否详细说明为什么需要将其限制为 5?请注意,任务调度程序不会同时启动所有 100 个,它在内部使用线程池(或线程池使用任务系统),因此它将并发任务的数量限制在很小的范围内,但它可能会改变,它可能与系统中的核心数量有关,但知道为什么要限制在特定数量可能会给出一些很好的答案。 计算实际上调用了一个 web 服务作为其操作的一部分。这使 Web 服务不堪重负。 5 只是一个例子。 并行怎么样? ***.com/questions/5009181/… 【参考方案1】:

我知道这已经快一年了,但我找到了一种更简单的方法来实现这一点,所以我想我会分享:

Dim actionsArray() As Action = 
     new Action()
         New Action(Sub() DoComputation1()),
         New Action(Sub() DoComputation2()),
         ...
         New Action(Sub() DoComputation100())
      

System.Threading.Tasks.Parallel.Invoke(New Tasks.ParallelOptions() With .MaxDegreeOfParallelism = 5, actionsArray)

瞧!

【讨论】:

【参考方案2】:

我知道这是一个旧线程,但我只是想分享我对这个问题的解决方案:使用信号量。

(这是在 C# 中)

private void RunAllActions(IEnumerable<Action> actions, int maxConcurrency)

    using(SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency))
    
        foreach(Action action in actions)
        
            Task.Factory.StartNew(() =>
            
                concurrencySemaphore.Wait();
                try
                
                    action();
                
                finally
                
                    concurrencySemaphore.Release();
                
            );
        
    

【讨论】:

感谢 Arrow_Raider。这是一个更好的解决方案。我实现了这一点,但使用“延续任务”来处理信号量释放。 我在执行代码时收到此错误““信号量已被释放。”。 我将@James 的想法提升到了一个新的水平。我在一个延续中调用了 release 并在父任务的延续中调用了 dispose。 这不会产生很多任务吗? concurrencySemaphore.Wait() 在新任务 lambda 块内。 @Abdul - 您收到该错误是因为一旦开始最后一个操作,就没有什么可以阻止在 concurrencySemaphore 上调用 Dispose。您可以通过在执行所有任务时在 Dispose 之前阻塞来解决此问题。或者,拉比建议有一个在延续上调用 Dispose 的父任务可以解决问题。【参考方案3】:

一个解决方案可能是查看 Microsoft here 的预制代码。

描述是这样的:“提供一个任务调度程序,在线程池之上运行时确保最大并发级别。”据我所知,它似乎可以解决问题,在与 ParallelOptions 中的 MaxDegreeOfParallelism 属性相同。

【讨论】:

【参考方案4】:

James

提供的示例的 C# 等效项
Action[] actionsArray = new Action[] 
new Action(() => DoComputation1()),
new Action(() => DoComputation2()),
    //...
new Action(() => DoComputation100())
  ;

   System.Threading.Tasks.Parallel.Invoke(new Tasks.ParallelOptions MaxDegreeOfParallelism =  5 , actionsArray)

【讨论】:

【参考方案5】:

My blog post 展示了如何使用任务和操作来执行此操作,并提供了一个示例项目,您可以下载并运行以查看两者的实际效果。

有动作

如果使用 Actions,您可以使用内置的 .Net Parallel.Invoke 函数。这里我们限制它最多并行运行 5 个线程。

var listOfActions = new List<Action>();
for (int i = 0; i < 100; i++)

    // Note that we create the Action here, but do not start it.
    listOfActions.Add(() => DoSomething());


var options = new ParallelOptions MaxDegreeOfParallelism = 5;
Parallel.Invoke(options, listOfActions.ToArray());

有任务

由于您在这里使用的是任务,因此没有内置函数。但是,您可以使用我在博客上提供的那个。

    /// <summary>
    /// Starts the given tasks and waits for them to complete. This will run, at most, the specified number of tasks in parallel.
    /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para>
    /// </summary>
    /// <param name="tasksToRun">The tasks to run.</param>
    /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param>
    /// <param name="cancellationToken">The cancellation token.</param>
    public static void StartAndWaitAllThrottled(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, CancellationToken cancellationToken = new CancellationToken())
    
        StartAndWaitAllThrottled(tasksToRun, maxTasksToRunInParallel, -1, cancellationToken);
    

    /// <summary>
    /// Starts the given tasks and waits for them to complete. This will run, at most, the specified number of tasks in parallel.
    /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para>
    /// </summary>
    /// <param name="tasksToRun">The tasks to run.</param>
    /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param>
    /// <param name="timeoutInMilliseconds">The maximum milliseconds we should allow the max tasks to run in parallel before allowing another task to start. Specify -1 to wait indefinitely.</param>
    /// <param name="cancellationToken">The cancellation token.</param>
    public static void StartAndWaitAllThrottled(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, int timeoutInMilliseconds, CancellationToken cancellationToken = new CancellationToken())
    
        // Convert to a list of tasks so that we don&#39;t enumerate over it multiple times needlessly.
        var tasks = tasksToRun.ToList();

        using (var throttler = new SemaphoreSlim(maxTasksToRunInParallel))
        
            var postTaskTasks = new List<Task>();

            // Have each task notify the throttler when it completes so that it decrements the number of tasks currently running.
            tasks.ForEach(t => postTaskTasks.Add(t.ContinueWith(tsk => throttler.Release())));

            // Start running each task.
            foreach (var task in tasks)
            
                // Increment the number of tasks currently running and wait if too many are running.
                throttler.Wait(timeoutInMilliseconds, cancellationToken);

                cancellationToken.ThrowIfCancellationRequested();
                task.Start();
            

            // Wait for all of the provided tasks to complete.
            // We wait on the list of "post" tasks instead of the original tasks, otherwise there is a potential race condition where the throttler&#39;s using block is exited before some Tasks have had their "post" action completed, which references the throttler, resulting in an exception due to accessing a disposed object.
            Task.WaitAll(postTaskTasks.ToArray(), cancellationToken);
        
    

然后创建任务列表并调用函数让它们运行,例如一次最多同时运行 5 个,您可以这样做:

var listOfTasks = new List<Task>();
for (int i = 0; i < 100; i++)

    var count = i;
    // Note that we create the Task here, but do not start it.
    listOfTasks.Add(new Task(() => Something()));

Tasks.StartAndWaitAllThrottled(listOfTasks, 5);

【讨论】:

太棒了!只有一个问题:在您的情况下,没有任务结果。假设每个任务都返回一个对象,并且您想从 StartAndWaitAllThrottled 方法返回一个对象列表。您将如何修改当前代码?【参考方案6】:

简短回答:如果您想要限制工作任务的数量,以免它们使您的网络服务饱和,那么我认为您的方法很好。

详细回答: .NET 4.0 中的新 System.Threading.Tasks 引擎在 .NET ThreadPool 之上运行。因为每个进程只有一个 ThreadPool 并且默认为最多 250 个工作线程。因此,如果您将 ThreadPool 的最大线程数设置为一个更适中的数字,您可能能够减少并发执行的线程数,从而减少使用 ThreadPool.SetMaxThreads (...) API 的任务数。

但是,请注意,您可能并不孤单地使用 ThreadPool,因为您使用的许多其他类也可能将项目排队到 ThreadPool。因此,这样做很有可能最终会破坏应用程序的其余部分。另请注意,由于 ThreadPool 采用一种算法来优化其对给定机器的底层内核的使用,因此将线程池可以排队的线程数限制为任意低的数量可能会导致一些灾难性的性能问题。

同样,如果您想执行少量工作任务/线程来执行某些任务,那么最好只创建少量任务(相对于 100 个)。

【讨论】:

【参考方案7】:

它看起来不像,尽管您可以创建一个 TaskScheduler 的子类来实现这种行为。

【讨论】:

【参考方案8】:

如果您的程序使用 web 服务,同时连接数将被限制为 ServicePointManager.DefaultConnectionLimit 属性。如果您想要 5 个同时连接,使用 Arrow_Raider 的解决方案是不够的。您还应该增加ServicePointManager.DefaultConnectionLimit,因为它默认只有2。

【讨论】:

这个问题实际上与 HTTP 请求没有任何关系,但更笼统。我认为这个答案更适合解决特定于 HTTP 请求的问题。

以上是关于System.Threading.Tasks - 限制并发任务的数量的主要内容,如果未能解决你的问题,请参考以下文章

MathNet.Numerics 和 Parse 中的 System.Threading.Tasks 冲突

MySQLConnector DLL 错误 System.Threading.Tasks.Extensions 链接

无法将 system.threading.tasks.task 转换为 system.collections.generic.list [重复]

Unity中的异步编程——在Unity中使用 C#原生的异步(Task,await,async) - System.Threading.Tasks

无法将类型“string”隐式转换为“System.Threading.Tasks.Task<string>”

参数 1:无法从 'System.Threading.Tasks.Task<Project.Models.Booking>' 转换为 Project.Models.Booking