如何实现令牌系统以限制 C# 中处理器/IO 繁重的多线程任务的并发性?
Posted
技术标签:
【中文标题】如何实现令牌系统以限制 C# 中处理器/IO 繁重的多线程任务的并发性?【英文标题】:How to implement a token system for limiting concurrency of a processor/IO heavy multithreading Tasks in C#? 【发布时间】:2021-03-22 11:19:25 【问题描述】:这是此问题的后续问题:How do you run a variable number of concurrent parametrizable infinite loop type of threads in C#?
假设我有一个值 taskLimit(假设为 20),其中没有在下面的 RunAsync 方法中创建的同时任务“MyTask”:
protected override async Task RunAsync(CancellationToken cancellationToken)
var tasks = new List<Task>();
try
for (int i = 0; i < taskLimit; i++)
tasks.Add(MyTask(cancellationToken, i);
await Task.WhenAll(tasks);
catch (Exception e)
//Exception Handling
public async Task MyTask(CancellationToken cancellationToken, int a)
while (true)
cancellationToken.ThrowIfCancellationRequested();
try
//long running code, if possible check for cancellation using the token
//Do something useful here. Very Processor and IO heavy. Takes 5-10 minutes to complete.
//SomeHeavyTask can only concurrently run upto a limit of say 5. Implement a token system of sorts
while(freeTokens<1)
await Task.Delay(TimeSpan.FromMinutes(1), cancellationToken);
freeTokens = freeTokens-1;
SomeHeavyTask(cancellationToken);
freeTokens = freeTokens+1;
//sleep for an independently parameterizable period, then wake up and repeat
await Task.Delay(TimeSpan.FromHours(parametrizableTaskDelay[i]), cancellationToken);
catch (Exception e)
//Exception Handling
有可能做这样的事情吗?在 C# 中是否有更好的更正式的方法来实现相同的目标?请注意这个问题的本质是 freeTokens 比 taskLimit 少得多。而且每个MyTask只花10%的时间在SomeHeavyTask()上,大部分时间花在await Task.Delay()上。
【问题讨论】:
@PeterBons 如果你知道这个问题的答案,请告诉我。 你想要的是docs.microsoft.com/en-us/dotnet/api/…吗? 相关:How to limit the amount of concurrent async I/O operations? 你不能在多线程代码中做freeTokens = freeTokens-1;
。不安全。
很多答案也可以在这里找到:***.com/questions/20355931/…
【参考方案1】:
您应该使用 Microsoft 的响应式框架(又名 Rx)- NuGet System.Reactive
并添加 using System.Reactive.Linq;
- 然后您可以这样做:
int taskLimit = 500;
int maxConcurrent = 5;
IObservable<Unit> query =
Observable
.Range(0, taskLimit)
.Select(x => Observable.FromAsync(ct => SomeHeavyTask(ct)))
.Merge(maxConcurrent);
await query;
在我的书中使用起来要容易得多。
【讨论】:
这是一种我从未见过的有趣方式。 出于某种原因,OP 可能希望将x
作为参数传递给 SomeHeavyTask
。
@TheodorZoulias - 可能,但这不在他的代码中,所以我只是用它运行。【参考方案2】:
另一种选择:
var block = new ActionBlock<int>(x => SomeHeavyTask(cancellationToken, x),
new ExecutionDataflowBlockOptions()
MaxDegreeOfParallelism = 20,
CancellationToken = cancellationToken
);
for (int i = 0; i < 100; i++)
await block.SendAsync(i, cancellationToken);
block.Complete();
await block.Completion;
【讨论】:
您可能应该将ActionBlock
配置为相同的cancellationToken
,以使整个过程在取消时更具响应性。【参考方案3】:
您可以使用SemaphoreSlim
来限制同时工作的任务数量(您仍然有taskLimit
任务处于活动状态,但只有有限数量的任务会同时执行繁重的工作;我认为这是你想要什么)。
这可以通过一个示例控制台应用程序进行最佳演示。如果您运行它,您将从输出中看到最多 5 个“繁重任务”同时处于活动状态。
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace Demo
class Program
static async Task Main()
Console.WriteLine("Starting");
// Cancel after 30 seconds for demo purposes.
using var source = new CancellationTokenSource(TimeSpan.FromSeconds(30));
await RunAsync(source.Token);
Console.WriteLine("Stopped.");
Console.ReadLine();
public static async Task RunAsync(CancellationToken cancellationToken)
int taskLimit = 20;
int concurrencyLimit = 5;
var sem = new SemaphoreSlim(concurrencyLimit);
var tasks = new List<Task>();
try
for (int i = 0; i < taskLimit; i++)
int p = i; // Prevent modified closure.
tasks.Add(Task.Run(() => MyTask(cancellationToken, p, sem)));
await Task.WhenAll(tasks);
catch (OperationCanceledException)
Console.WriteLine("Task(s) were cancelled.");
catch (Exception e)
// Exception Handling
public static async Task MyTask(CancellationToken cancellationToken, int a, SemaphoreSlim sem)
while (true)
cancellationToken.ThrowIfCancellationRequested();
try
await sem.WaitAsync(cancellationToken);
try
someHeavyTask(cancellationToken, a);
finally
sem.Release();
catch (OperationCanceledException)
Console.WriteLine("Task was cancelled.");
return;
catch (Exception e)
//Exception Handling
static int heavyTaskCount;
static void someHeavyTask(CancellationToken cancel, int a)
int n = Interlocked.Increment(ref heavyTaskCount);
Console.WriteLine("Starting heavy task. Number of simultaneous heavy tasks = " + n);
// Simulate work. Make the work for each task take varying time by using 'a' for the sleep.
for (int i = 0; i < 20 && !cancel.IsCancellationRequested; ++i)
Thread.Sleep(100 + a*10);
n = Interlocked.Decrement(ref heavyTaskCount);
Console.WriteLine("Finishing heavy task. Number of simultaneous heavy tasks = " + n);
这里的核心是由代码中的信号量控制的:
await sem.WaitAsync(cancellationToken);
try
someHeavyTask(cancellationToken, a);
finally
sem.Release();
【讨论】:
【参考方案4】:就像@mjwills 所说,您可以使用C# semaphore 来管理对资源的并发访问。 (random example)
我确实建议先查看现有解决方案。例如,Hangfire。 如果需要,您可以存储其state inside SF。
【讨论】:
以上是关于如何实现令牌系统以限制 C# 中处理器/IO 繁重的多线程任务的并发性?的主要内容,如果未能解决你的问题,请参考以下文章
如何在启用 UAC 的 Windows 服务中调用 LogonUser() 以获取不受限制的完整令牌?
如何将 QProcess 的执行与 QProgressBar 的推进联系起来以实现非常繁重的计算循环