使用队列的线程可重用性(线程之间的双向通信)
Posted
技术标签:
【中文标题】使用队列的线程可重用性(线程之间的双向通信)【英文标题】:Thread reusability using a Queue (two way communication between threads) 【发布时间】:2021-11-10 05:09:44 【问题描述】:我有一个控制器线程,它以 20 个任务的批量对任务进行排队,并且有 n 个任务。现在我有 20 个线程来执行每个任务,如果每个线程都完成了任务,我想再次重用它以从队列中获取任务。我是线程同步的新手,所以请原谅我。我无法重用线程。它正在执行 1 个批处理,即 20 个线程执行并完成。
Queue<Action> queueComputersToScan = new Queue<Action>();
ManualResetEvent manualResetEvent = new ManualResetEvent(false);
int batchsize = 20;
Thread controllerThread = new Thread(() =>
for (int x = 0; x < Math.Ceiling((decimal)GetListOfComputers().Count / batchsize); x++)
List<ComputerInformation> computers = new List<ComputerInformation>
(GetListOfComputers().Skip(x * batchsize).Take(batchsize));
foreach (var computer in computers)
queueComputersToScan.Enqueue(() => ScanComputer(computer));
// when queue will have 20 jobs this event signal child threads to work on it
manualResetEvent.Set();
);
controllerThread.Start();`
20 个工作线程的代码:
int maxThread = 20;
for (int i = 0; i < maxThread; i++)
List<FileInformation> fileInformationPerComputer = new List<FileInformation>();
string threadName = string.Empty;
Thread thread = new Thread(() =>
lock (syncLock)
if (manualResetEvent.WaitOne())
if (queueComputersToScan.Count > 0)
Action scanComputerJob = queueComputersToScan.Dequeue();
if (scanComputerJob != null)
scanComputerJob();
);
thread.Name = "Thread on" + threadName;
thread.Start();
Console.WriteLine(thread.Name.ToLower() + " has started");
threadsPerComputer.Add(thread);
上述程序的输出是它从队列中获取 20 个作业,然后停止从队列中获取其他作业。这是一种通信方式,工作线程不再从队列中获取(我想修复)。
我对控制器线程和工作线程之间的双向通信或相互通信有点困惑。我想实现这个 控制器线程获取 20 个作业,然后控制器停止并向 20 个线程发出信号以处理 20 个作业。 当工人线程完成 20 个作业时,工人停止并向控制器线程发出信号以获取接下来的 20 个作业,控制器线程停止并再次向工人发出信号以再次处理 20 个作业,直到队列变空。
【问题讨论】:
TL;DR 但由于代码未使用 ConcurrentQueue,因此不太可能正确。 也许可以看看TPL dataflow 作为替代 @AlexeiLevenkov ya 在 2021 年,您通常不应该想“我将创建线程、锁和管理同步等”。您应该考虑“有哪些高级抽象可以让我处理我的问题而不是线程机制”。对于初学者来说,你的所有线程似乎只会按顺序运行,因为它们都在争夺锁并在其中完成所有工作。这是不对的。 @Damien_The_Unbeliever 一开始都是初学者 :) 【参考方案1】:您应该使用 Microsoft 的响应式框架(又名 Rx)- NuGet System.Reactive
并添加 using System.Reactive.Linq;
- 然后您可以这样做:
int batchsize = 20;
IObservable<Unit> query =
GetListOfComputers()
.ToObservable()
.Select(computer => Observable.Start(() => ScanComputer(computer)))
.Merge(batchsize);
IDisposable subscription = query.Subscribe();
就是这样。
【讨论】:
以上是关于使用队列的线程可重用性(线程之间的双向通信)的主要内容,如果未能解决你的问题,请参考以下文章