如何限制运行的并行任务数量? [关闭]

Posted

技术标签:

【中文标题】如何限制运行的并行任务数量? [关闭]【英文标题】:How to limit number of parallel tasks running? [closed] 【发布时间】:2021-07-09 19:18:37 【问题描述】:

我的代码可以侦听文件夹并检测放入其中的新文件。 目前,每个删除的新文件都会调用WatcherOnCreated 方法,该方法会创建一个新任务来与其他文件并行加密文件。

我想限制并行文件加密的数量。

我尝试使用计数为 5 的信号量,这样无论放入多少文件,都只能同时进行 5 次加密。

但是,它不起作用,导致我的程序没有响应。

class Test

        private static LimitEncryptionSemaphore;

        public Test()
        
            LimitEncryptionSemaphore = new SemaphoreSlim(5);
        
        
        // some function which calls WatcherOnCreated
         
        private void WatcherOnCreated(string filePath, WatcherChangeTypes changeType)
        
            string fileName = Path.GetFileName(filePath);

            Logger.Debug($"A created item fileName was detected in drop folder.");

            LimitEncryptionSemaphore.WaitAsync();
            // FireAndForget calls Task.Run(Func<Task>) with the callback function
            TaskFactory.FireAndForget(async () =>
            
                try
                
                    await ExponentialBackoffPolicy.ExecuteAsync(async () =>
                    
                        using (DeviceData deviceData = ReadDeviceData(filePath))
                        
                            // Raise the event.
                            await OnDeviceDataAvailable(FolderDevice, deviceData);
                        
                    );

                    if (FileSystem.Exists(filePath))
                    
                        // Delete the file once the handler is done.
                        FileSystem.DeleteFile(filePath);

                        Logger.Debug($"filePath was deleted.");
                    
                
                catch (Exception ex)
                
                    Logger.Error(ex);
                
                finally
                
                    // Release semaphore
                    LimitEncryptionSemaphore.Release();
                
            );

        

【问题讨论】:

我认为您的问题会更多地与codereview.stackexchange.com 的主题有关,您应该尝试在那里发帖。 您的代码限制为 10 而不是 5。 请注意,虽然您已经编辑了您的问题以尽量减少基于意见的问题(这就是前两个接近投票的原因),但它仍然缺少可靠地重现您的问题的 minimal reproducible example描述,因此是第三次也是最后一次近距离投票。 【参考方案1】:

代码有几个问题:

    LimitEncryptionSemaphore.WaitAsync 返回的任务被忽略,而不是awaited。 LimitEncryptionSemaphore.WaitAsync 的调用方法与 LimitEncryptionSemaphore.Release 不同。

修复:

private void WatcherOnCreated(string filePath, WatcherChangeTypes changeType)

  string fileName = Path.GetFileName(filePath);
  Logger.Debug($"A created item fileName was detected in drop folder.");

  TaskFactory.FireAndForget(async () =>
  
    await LimitEncryptionSemaphore.WaitAsync();
    try
    
      ...
    
    catch (Exception ex)
    
      ...
    
    finally
    
      LimitEncryptionSemaphore.Release();
    
  );

【讨论】:

谢谢!我在 try 块的更深层方法(在 OnDeviceDataAvailable 内)中结束了使用 WaitAsync 和 Release。我还需要处理信号量吗? @FryingPan:如果您需要重新创建它所在的组件,那么可以。如果整个应用程序正在关闭,则无需这样做。 WaitAsyncRelease 在应用程序运行期间确实被多次调用的函数中(每次上传新文件时,都会调用 Task.Run 来加密文件)。我认为这就是“重新创建组件”的意思。因此,在这种情况下,我必须在发布后处理每个线程中的信号量,对吗? @FryingPan: 不。SemaphoreSlim 需要共享,并且只有在使用它完成所有操作后才能处理。【参考方案2】:

我认为,由于您无法控制丢弃的文件数量,因此最好使用并发队列和循环处理排队的条目。

在文件观察器事件队列中,队列中的项目并启动一个后台工作者来处理数据。如果队列太长,您可以启动一个新的后台进程,但是如果这样做,您可能会饿死服务器处理器。

您还可以使用 Parallel.For 进行循环,并带有一个参数,说明如果队列比您喜欢的长,在您的情况 5 中,您希望开始的最大值。

【讨论】:

【参考方案3】:

TaskScheduler 的 .Net 文档有一个实现具有最大并行度的调度程序的示例。请看一下 https://docs.microsoft.com/en-us/dotnet/api/system.threading.tasks.taskscheduler?view=net-5.0

【讨论】:

以上是关于如何限制运行的并行任务数量? [关闭]的主要内容,如果未能解决你的问题,请参考以下文章

如何更改pyspark中的并行任务数

如何限制数据库的异步IO任务数量?

Python:多任务,并发,并行的理解及线程进程的对比

并发与并行

真正的并行处理[关闭]

Flink并行度