如何限制运行的并行任务数量? [关闭]
Posted
技术标签:
【中文标题】如何限制运行的并行任务数量? [关闭]【英文标题】:How to limit number of parallel tasks running? [closed] 【发布时间】:2021-07-09 19:18:37 【问题描述】:我的代码可以侦听文件夹并检测放入其中的新文件。
目前,每个删除的新文件都会调用WatcherOnCreated
方法,该方法会创建一个新任务来与其他文件并行加密文件。
我想限制并行文件加密的数量。
我尝试使用计数为 5 的信号量,这样无论放入多少文件,都只能同时进行 5 次加密。
但是,它不起作用,导致我的程序没有响应。
class Test
private static LimitEncryptionSemaphore;
public Test()
LimitEncryptionSemaphore = new SemaphoreSlim(5);
// some function which calls WatcherOnCreated
private void WatcherOnCreated(string filePath, WatcherChangeTypes changeType)
string fileName = Path.GetFileName(filePath);
Logger.Debug($"A created item fileName was detected in drop folder.");
LimitEncryptionSemaphore.WaitAsync();
// FireAndForget calls Task.Run(Func<Task>) with the callback function
TaskFactory.FireAndForget(async () =>
try
await ExponentialBackoffPolicy.ExecuteAsync(async () =>
using (DeviceData deviceData = ReadDeviceData(filePath))
// Raise the event.
await OnDeviceDataAvailable(FolderDevice, deviceData);
);
if (FileSystem.Exists(filePath))
// Delete the file once the handler is done.
FileSystem.DeleteFile(filePath);
Logger.Debug($"filePath was deleted.");
catch (Exception ex)
Logger.Error(ex);
finally
// Release semaphore
LimitEncryptionSemaphore.Release();
);
【问题讨论】:
我认为您的问题会更多地与codereview.stackexchange.com 的主题有关,您应该尝试在那里发帖。 您的代码限制为 10 而不是 5。 请注意,虽然您已经编辑了您的问题以尽量减少基于意见的问题(这就是前两个接近投票的原因),但它仍然缺少可靠地重现您的问题的 minimal reproducible example描述,因此是第三次也是最后一次近距离投票。 【参考方案1】:代码有几个问题:
-
从
LimitEncryptionSemaphore.WaitAsync
返回的任务被忽略,而不是await
ed。
LimitEncryptionSemaphore.WaitAsync
的调用方法与 LimitEncryptionSemaphore.Release
不同。
修复:
private void WatcherOnCreated(string filePath, WatcherChangeTypes changeType)
string fileName = Path.GetFileName(filePath);
Logger.Debug($"A created item fileName was detected in drop folder.");
TaskFactory.FireAndForget(async () =>
await LimitEncryptionSemaphore.WaitAsync();
try
...
catch (Exception ex)
...
finally
LimitEncryptionSemaphore.Release();
);
【讨论】:
谢谢!我在 try 块的更深层方法(在OnDeviceDataAvailable
内)中结束了使用 WaitAsync 和 Release。我还需要处理信号量吗?
@FryingPan:如果您需要重新创建它所在的组件,那么可以。如果整个应用程序正在关闭,则无需这样做。
我 WaitAsync
和 Release
在应用程序运行期间确实被多次调用的函数中(每次上传新文件时,都会调用 Task.Run 来加密文件)。我认为这就是“重新创建组件”的意思。因此,在这种情况下,我必须在发布后处理每个线程中的信号量,对吗?
@FryingPan: 不。SemaphoreSlim
需要共享,并且只有在使用它完成所有操作后才能处理。【参考方案2】:
我认为,由于您无法控制丢弃的文件数量,因此最好使用并发队列和循环处理排队的条目。
在文件观察器事件队列中,队列中的项目并启动一个后台工作者来处理数据。如果队列太长,您可以启动一个新的后台进程,但是如果这样做,您可能会饿死服务器处理器。
您还可以使用 Parallel.For 进行循环,并带有一个参数,说明如果队列比您喜欢的长,在您的情况 5 中,您希望开始的最大值。
【讨论】:
【参考方案3】:TaskScheduler 的 .Net 文档有一个实现具有最大并行度的调度程序的示例。请看一下 https://docs.microsoft.com/en-us/dotnet/api/system.threading.tasks.taskscheduler?view=net-5.0
【讨论】:
以上是关于如何限制运行的并行任务数量? [关闭]的主要内容,如果未能解决你的问题,请参考以下文章