如何防止 CPU “最大化”:同步方法异步调用多个工作人员并使用 SemaphoreSlim 进行节流?

Posted

技术标签:

【中文标题】如何防止 CPU “最大化”:同步方法异步调用多个工作人员并使用 SemaphoreSlim 进行节流?【英文标题】:How do I prevent "maxing out" of CPU: Synchronous method calling multiple workers asynchronously & throttling using SemaphoreSlim? 【发布时间】:2019-12-25 15:03:28 【问题描述】:

我目前正在优化一个现有的、非常缓慢且超时的生产应用程序。 没有重写的选项

简而言之,它是一个 WCF 服务,当前依次调用其他 4 个“worker”WCF 服务。任何工作人员服务都不依赖于其他工作人员的结果。 因此我们希望它一次调用它们(而不是顺序调用)。我会重申,我们没有重写它的奢侈。

优化涉及使其一次调用所有工作服务。这就是我想到异步的地方。

我在异步编程方面的经验有限,但就我的解决方案而言,我已经尽可能广泛地阅读了该主题。

问题是,在测试中,它可以工作,但会耗尽我的 CPU。感谢您的帮助

以下是主要 WCF 服务中基本代码的简化版本

// The service operation belonging to main WCF Service
public void ProcessAllPendingWork()

    var workerTasks = new List<Task<bool>>();
    foreach(var workerService in _workerServices)
    
        //DoWorkAsync is the worker method with the following signature:
        // Task<bool> DoWorkAsync()

        var workerTask = workerService.DoWorkAsync()
        workerTasks.Add(workerTask);
    

    var task = Task.Run(async ()=>
    
        await RunWorkerTasks(workerTasks);
    );
    task.Wait();




private async RunWorkerTasks(IEnumerable<Tast<bool>> workerTasks)

    using(var semaphore = new SemaphoreSlim(initialCount:3))
    

        foreach (var workerTask in workerTasks)
        
            await semaphore.WaitAsync();
            try
            
                await workerTask;
            
            catch (System.Exception)
            
                //assume 'Log' is a predefined logging service
                Log.Error(ex);
            
        
    
 

我读过的:

Multiple ways how to limit parallel tasks processing

How to limit the amount of concurrent async I/O operations?

Approaches for throttling asynchronous methods in C#

Constraining Concurrent Threads in C#

Limiting Number of Concurrent Threads With SemaphoresSlim

Async WCF call with ChannelFactory and CreateChannel

【问题讨论】:

除非我遗漏了什么,在填充了您的workerTasks 列表后,您可以调用await Task.WhenAll(workerTasks) 并删除整个RunWorkerTasks 部分 我怀疑您需要在开始任务之前等待信号量,而不是之后Parallel.ForEach(与MaxDegreesOfParallelism)也可能值得考虑。 Using async/await for multiple tasks的可能重复 最大化 CPU 可能是个好消息,如果整个完成时间相应减少的话。并行运行的次数越多,使用的 CPU 就越多。 没有关于为什么它不应该最大化 CPU 的信息。没有关于工人工作种类的信息:IO/CPU bound。仍然期待一个有用的答案。投反对票。 【参考方案1】:

你没有解释你想如何限制并发调用。您想要运行 30 个并发工作任务,还是想要 30 个 WCF 调用,每个调用都有其所有工作任务并发运行,或者您希望并发 WCF 调用每个都有自己的并发工作任务限制?鉴于您说每个 WCF 调用只有 4 个工作任务并查看您的示例代码,我假设您希望全局限制 30 个并发工作任务。

首先,正如@mjwills 所暗示的,您需要使用 SemaphoreSlim 来限制对workerService.DoWorkAsync() 的调用。您的代码当前启动了所有这些,并且只尝试限制您等待完成的数量。我认为这就是你最大化 CPU 的原因。启动的工作任务的数量仍然是无限的。但请注意,您还需要在持有信号量时等待工作任务,否则您只会限制创建任务的速度,而不是同时运行的任务数量。

其次,您正在为每个 WCF 请求创建一个新的 SemaphoreSlim。因此,我在第一段中提出了问题。这会限制任何事情的唯一方法是,如果您的工作人员服务比初始计数多,在您的示例中为 30,但您说只有 4 个工作人员。要获得“全局”限制,您需要使用单例 SemaphoreSlim。

第三,您永远不会在 SemaphoreSlim 上调用 .Release(),因此,如果您确实将其设为单例,那么您的代码将在进程启动后启动 30 名工作人员后挂起。确保在 try-finally 块中执行此操作,这样如果 worker 崩溃,它仍然会被释放。

这是一些匆忙编写的示例代码:

public async Task ProcessAllPendingWork()

    var workerTasks = new List<Task<bool>>();
    foreach(var workerService in _workerServices)
    
        var workerTask = RunWorker(workerService);
        workerTasks.Add(workerTask);
    

    await Task.WhenAll(workerTasks);


private async Task<bool> RunWorker(Func<bool> workerService)

    // use singleton semaphore.
    await _semaphore.WaitAsync();
    try
    
        return await workerService.DoWorkAsync();
    
    catch (System.Exception)
    
        //assume error is a predefined logging service
        Log.Error(ex);
        return false; // ??
    
    finally
    
        _semaphore.Release();
    

【讨论】:

我已编辑,这是一个错字。它应该是3个线程。对此感到抱歉 @user919426 那么您是否希望每次对 WCF 服务的调用一次运行 3 个工作人员,所以 5 个并发 WCF 调用意味着 15 个工作人员?还是运行 3 个工作人员,不管有多少并发 WCF 调用?无论如何,它只会影响我的第二点。第一点和第三点仍然存在。 我希望最多同时调用 3 个工作人员。 在这种情况下,我的原始答案仍然是我回答您问题的最佳尝试。我懒得在我的答案中将 30 更改为 3,但我确实添加了一些示例代码来展示我认为您需要如何做。 正确执行异步是必要的,尽管异步旨在解决等待 IO,而不是执行并行 CPU 密集型工作。您可以将Task.WhenAll 更改为Task.WaitAll,这是一个阻塞API,因此不会更改签名。【参考方案2】:

TPL(Task parallel library)提供的Task抽象是对Thread的抽象;任务在线程池中排队,然后在执行程序可以管理该请求时执行。

换句话说,根据某些因素(您的流量、CPU 与 IO 绑定和部署模型),尝试在您的工作函数中执行托管任务可能根本不会带来任何好处(或者在某些情况下会更慢)。

话虽如此,我建议您使用Task.WaitAll(可从.NET 4.0 获得)使用非常高级的抽象来管理并发;特别是这段代码可能对您有用:

它创建工人并等待所有人 执行耗时10秒(最长的Worker) 它会捕获并为您提供管理异常的机会 [最后但并非最不重要的] 是一个声明性 API,它将您的注意力集中在该做什么而不是如何做上。
public class Q57572902

    public void ProcessAllPendingWork()
    
        var workers = new Action[] Worker1, Worker2, Worker3;

        try
        
            Task.WaitAll(workers.Select(Task.Factory.StartNew).ToArray());
            // ok
        
        catch (AggregateException exceptions)
        
            foreach (var ex in exceptions.InnerExceptions)
            
                Log.Error(ex);
            
            // ko
        
    

    public void Worker1() => Thread.Sleep(FromSeconds(5)); // do something

    public void Worker2() => Thread.Sleep(FromSeconds(10)); // do something

    public void Worker3() => throw new NotImplementedException("error to manage"); // something wrong


我从 cmets 看到,您最多需要 3 个工人同时运行;在这种情况下,您可以简单地从 TaskScheduler 文档中复制粘贴 LimitedConcurrencyLevelTaskScheduler

之后,您必须像这样使用其 onw TaskFactory 创建 sigleton 实例 TaskScheduler

public static class WorkerScheduler

    public static readonly TaskFactory Factory;

    static WorkerScheduler()
    
        var scheduler = new LimitedConcurrencyLevelTaskScheduler(3);
        Factory = new TaskFactory(scheduler);
    

之前的ProcessAllPendingWork() 代码保持不变,除了

...workers.Select(Task.Factory.StartNew)...

变成了

...workers.Select(WorkerScheduler.Factory.StartNew)...

因为您必须使用与您的自定义 WorkerScheduler 关联的 TaskFactory

如果您的工作人员需要返回一些数据以响应,则需要以不同的方式管理错误和数据,如下所示:

public void ProcessAllPendingWork()

    var workers = new Func<bool>[] Worker1, Worker2, Worker3;
    var tasks = workers.Select(WorkerScheduler.Factory.StartNew).ToArray();

    bool[] results = null;

    Task
        .WhenAll(tasks)
        .ContinueWith(x =>
        
            if (x.Status == TaskStatus.Faulted)
            
                foreach (var exception in x.Exception.InnerExceptions)
                    Log(exception);

                return;
            

            results = x.Result; // save data in outer scope
        )
        .Wait();

    // continue execution
    // results is now filled: if results is null, some errors occured

【讨论】:

感谢您的回答,但这与现有代码相差甚远,我无权更改。 如果您有顺序执行的工作人员,此解决方案似乎与您提供的 sn-p 最接近。我避免信号量、异步/等待靠近您现有的代码。以女巫的方式,这个解决方案对你来说似乎很遥远?【参考方案3】:

除非我错过了什么 - 您的示例代码并行运行所有工作人员。在调用“workerService.DoWorkAsync()”时,工作人员开始了它的工作。 'RunWorkerTasks' 只等待工作任务完成。 'DoWorkAsync()' 启动异步操作,而 'await' 暂停调用方法的执行,直到等待的任务完成。

CPU 使用率高的事实很可能是由于您的 workerService 的活动,而不是由于您调用它们的方式。为了验证这一点,请尝试将workerService.DoWorkAsync() 替换为Thread.Sleep(..)Task.Delay(..)。如果您的 CPU 使用率下降,则应归咎于工人。 (取决于 workerService 的作用)一旦并行运行它们,CPU 消耗可能会增加,甚至预期会增加。

关于如何限制并行执行的问题。请注意,以下示例并不完全使用 3 个线程,而是最多使用 3 个线程。

    Parallel.ForEach(
        _workerServices,
        new ParallelOptions  MaxDegreeOfParallelism = 3 ,
        workerService => workerService.DoWorkAsync()
            .ContinueWith(res => 
            
                // Handle your result or possible exceptions by consulting res.
            )
            .Wait());

正如您之前提到的,您的代码是按顺序执行的,我假设工作人员也有一个非异步等效项。使用它们可能更容易。同步调用异步方法很麻烦。我什至通过调用DoWorkAsync().Wait() 就遇到了死锁情况。关于How would I run an async Task<T> method synchronously? 的讨论很多。本质上,我试图避免它。如果这不可能,我会尝试使用ContinueWith,这会增加复杂性,或者使用之前 SO 讨论的AsyncHelper

    var results = new ConcurrentDictionary<WorkerService, bool>();
    Parallel.ForEach(
        _workerServices,
        new ParallelOptions  MaxDegreeOfParallelism = 3 ,
        workerService => 
            
                // Handle possible exceptions via try-catch.
                results.TryAdd(workerService, workerService.DoWork());
            );
    // evaluate results

Parallel.ForEach 利用线程或任务池。这意味着它将给定参数Action&lt;TSource&gt; body 的每次执行分派到专用线程上。您可以使用以下代码轻松验证这一点。如果Parallel.ForEach 已经在不同的线程上分派了工作,您可以简单地同步执行您的“昂贵”操作。任何异步操作都是不必要的,甚至会对运行时性能产生不良影响。

    Parallel.ForEach(
        Enumerable.Range(1, 4),
        m => Console.WriteLine(Thread.CurrentThread.ManagedThreadId));

这是我用来测试的演示项目,不依赖你的workerService。

    private static bool DoWork()
    
        Thread.Sleep(5000);
        Console.WriteLine($"done by Thread.CurrentThread.ManagedThreadId.");
        return DateTime.Now.Millisecond % 2 == 0;
    

    private static Task<bool> DoWorkAsync() => Task.Run(DoWork);

    private static void Main(string[] args)
    
        var sw = new Stopwatch();
        sw.Start();

        // define a thread-safe dict to store the results of the async operation
        var results = new ConcurrentDictionary<int, bool>();

        Parallel.ForEach(
            Enumerable.Range(1, 4), // this replaces the list of workers
            new ParallelOptions  MaxDegreeOfParallelism = 3 ,
            // m => results.TryAdd(m, DoWork()), // this is the alternative synchronous call
            m => DoWorkAsync().ContinueWith(res => results.TryAdd(m, res.Result)).Wait());

        sw.Stop();

        // print results
        foreach (var item in results)
        
            Console.WriteLine($"item.Key=item.Value");
        

        Console.WriteLine(sw.Elapsed.ToString());
        Console.ReadLine();
    

【讨论】:

感谢您的详细回答,但calling an async method synchronously is mostly a hassle. 是什么意思?为什么我应该在同步方法上使用Parallel.ForEach 而不是我的实现?与另一个相比有什么好处或坏处?....在您的演示项目中,我假设ConcurrentDicttionary&lt;int, bool&gt; 是一个线程安全字典来收集结果。如果我是对的或错的,请包括 cmets :)...我还假设 Enumerable.Range(1, 4) 是模拟我的工作服务集合。对吗? 关于您的Enumerable.Range(1,4) 的另一个问题,我假设它模拟了我的 WorkerService 集合。对于我的场景和问题,这样说是否正确,代码现在看起来如下Parallel.ForEach(_workerServices, new ParallelOptions MaxDegreeOfParallelism = 3 ,s =&gt; s.DoWorkAsync().ContinueWith(res =&gt; results.TryAdd(m, res.Result)).Wait()); ? ...否则我会尝试您的演示并提供反馈。 我试图通过更新问题来解决您的 cmets 问题。这种方法还是@zivkan 方法更好是个人品味。我相信 Parallel.ForEach 会根据给定的系统硬件和系统的繁忙程度来决定使用多少线程。 MaxDegreeOfParallelism 只是为该决定设置了一个上限。 嗨。您的答案最接近我们要使用的解决方案。我们决定采用这种方法。这不会降低其他人的讨论和回答的质量。谢谢! 也许我误解了这个问题:要实现的目标是每个 http 请求最多有 3 个活动工作者?还是整个主机最多 3 个活动工作人员?在第一种情况下这是一个很好的解决方案,而不是在第二种情况下

以上是关于如何防止 CPU “最大化”:同步方法异步调用多个工作人员并使用 SemaphoreSlim 进行节流?的主要内容,如果未能解决你的问题,请参考以下文章

异步IO和协程

C#多线程与异步的区别

异步和多线程

同步异步阻塞非阻塞 总结

如何在处理多个文件时最大化吞吐量

java 异步调用方法