如何实现令牌系统以限制 C# 中处理器/IO 繁重的多线程任务的并发性?

Posted

技术标签:

【中文标题】如何实现令牌系统以限制 C# 中处理器/IO 繁重的多线程任务的并发性?【英文标题】:How to implement a token system for limiting concurrency of a processor/IO heavy multithreading Tasks in C#? 【发布时间】:2021-03-22 11:19:25 【问题描述】:

这是此问题的后续问题:How do you run a variable number of concurrent parametrizable infinite loop type of threads in C#?

假设我有一个值 taskLimit(假设为 20),其中没有在下面的 RunAsync 方法中创建的同时任务“MyTask”:

protected override async Task RunAsync(CancellationToken cancellationToken)

    var tasks = new List<Task>();
    try
    
        for (int i = 0; i < taskLimit; i++)
        
            tasks.Add(MyTask(cancellationToken, i);
        
        
        await Task.WhenAll(tasks);
    
    catch (Exception e)
    
        //Exception Handling
    

public async Task MyTask(CancellationToken cancellationToken, int a)

    while (true)
    
        cancellationToken.ThrowIfCancellationRequested();

        try
        
            //long running code, if possible check for cancellation using the token
            //Do something useful here. Very Processor and IO heavy. Takes 5-10 minutes to complete.
            //SomeHeavyTask can only concurrently run upto a limit of say 5. Implement a token system of sorts
            while(freeTokens<1)
            
            await Task.Delay(TimeSpan.FromMinutes(1), cancellationToken);
            
            freeTokens = freeTokens-1;
            SomeHeavyTask(cancellationToken);
            freeTokens = freeTokens+1;
            
            //sleep for an independently parameterizable period, then wake up and repeat
            await Task.Delay(TimeSpan.FromHours(parametrizableTaskDelay[i]), cancellationToken);
        
        catch (Exception e)
        
            //Exception Handling
        
    

有可能做这样的事情吗?在 C# 中是否有更好的更正式的方法来实现相同的目标?请注意这个问题的本质是 freeTokens 比 taskLimit 少得多。而且每个MyTask只花10%的时间在SomeHeavyTask()上,大部分时间花在await Task.Delay()上。

【问题讨论】:

@PeterBons 如果你知道这个问题的答案,请告诉我。 你想要的是docs.microsoft.com/en-us/dotnet/api/…吗? 相关:How to limit the amount of concurrent async I/O operations? 你不能在多线程代码中做freeTokens = freeTokens-1;。不安全。 很多答案也可以在这里找到:***.com/questions/20355931/… 【参考方案1】:

您应该使用 Microsoft 的响应式框架(又名 Rx)- NuGet System.Reactive 并添加 using System.Reactive.Linq; - 然后您可以这样做:

int taskLimit = 500;
int maxConcurrent = 5;

IObservable<Unit> query =
    Observable
        .Range(0, taskLimit)
        .Select(x => Observable.FromAsync(ct => SomeHeavyTask(ct)))
        .Merge(maxConcurrent);
        
await query;

在我的书中使用起来要容易得多。

【讨论】:

这是一种我从未见过的有趣方式。 出于某种原因,OP 可能希望将 x 作为参数传递给 SomeHeavyTask @TheodorZoulias - 可能,但这不在他的代码中,所以我只是用它运行。【参考方案2】:

另一种选择:

var block = new ActionBlock<int>(x => SomeHeavyTask(cancellationToken, x), 
    new ExecutionDataflowBlockOptions() 
     
        MaxDegreeOfParallelism = 20,
        CancellationToken = cancellationToken
    );

for (int i = 0; i < 100; i++)
    await block.SendAsync(i, cancellationToken);
    
block.Complete();
await block.Completion;

【讨论】:

您可能应该将ActionBlock 配置为相同的cancellationToken,以使整个过程在取消时更具响应性。【参考方案3】:

您可以使用SemaphoreSlim 来限制同时工作的任务数量(您仍然有taskLimit 任务处于活动状态,但只有有限数量的任务会同时执行繁重的工作;我认为这是你想要什么)。

这可以通过一个示例控制台应用程序进行最佳演示。如果您运行它,您将从输出中看到最多 5 个“繁重任务”同时处于活动状态。

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace Demo

    class Program
    
        static async Task Main()
        
            Console.WriteLine("Starting");

            // Cancel after 30 seconds for demo purposes.
            using var source = new CancellationTokenSource(TimeSpan.FromSeconds(30));
            await RunAsync(source.Token);

            Console.WriteLine("Stopped.");
            Console.ReadLine();
        

        public static async Task RunAsync(CancellationToken cancellationToken)
        
            int taskLimit = 20;
            int concurrencyLimit = 5;

            var sem   = new SemaphoreSlim(concurrencyLimit);
            var tasks = new List<Task>();

            try
            
                for (int i = 0; i < taskLimit; i++)
                
                    int p = i; // Prevent modified closure.
                    tasks.Add(Task.Run(() => MyTask(cancellationToken, p, sem)));
                

                await Task.WhenAll(tasks);
            

            catch (OperationCanceledException)
            
                Console.WriteLine("Task(s) were cancelled.");
            

            catch (Exception e)
            
                // Exception Handling
            
        

        public static async Task MyTask(CancellationToken cancellationToken, int a, SemaphoreSlim sem)
        
            while (true)
            
                cancellationToken.ThrowIfCancellationRequested();

                try
                
                    await sem.WaitAsync(cancellationToken);

                    try
                    
                        someHeavyTask(cancellationToken, a);
                    
                    
                    finally
                    
                        sem.Release();
                    
                

                catch (OperationCanceledException)
                
                    Console.WriteLine("Task was cancelled.");
                    return;
                

                catch (Exception e)
                
                    //Exception Handling
                
            
        

        static int heavyTaskCount;

        static void someHeavyTask(CancellationToken cancel, int a)
        
            int n = Interlocked.Increment(ref heavyTaskCount);
            Console.WriteLine("Starting heavy task. Number of simultaneous heavy tasks = " + n);

            // Simulate work. Make the work for each task take varying time by using 'a' for the sleep.

            for (int i = 0; i < 20 && !cancel.IsCancellationRequested; ++i)
            
                Thread.Sleep(100 + a*10);
            

            n = Interlocked.Decrement(ref heavyTaskCount);
            Console.WriteLine("Finishing heavy task. Number of simultaneous heavy tasks = " + n);
        
    

这里的核心是由代码中的信号量控制的:

await sem.WaitAsync(cancellationToken);

try

    someHeavyTask(cancellationToken, a);


finally

    sem.Release();

【讨论】:

【参考方案4】:

就像@mjwills 所说,您可以使用C# semaphore 来管理对资源的并发访问。 (random example)

我确实建议先查看现有解决方案。例如,Hangfire。 如果需要,您可以存储其state inside SF。

【讨论】:

以上是关于如何实现令牌系统以限制 C# 中处理器/IO 繁重的多线程任务的并发性?的主要内容,如果未能解决你的问题,请参考以下文章

如何在启用 UAC 的 Windows 服务中调用 LogonUser() 以获取不受限制的完整令牌?

如何实现用于生成视频聊天令牌的服务器 - vidyo.io

Socket.io 命名空间限制以及如何扩展它

如何将 QProcess 的执行与 QProgressBar 的推进联系起来以实现非常繁重的计算循环

GitHub OAuth2 令牌:如何限制访问以读取单个私有仓库

使用 Golang 实现简易的令牌桶算法