使用队列的线程可重用性(线程之间的双向通信)

Posted

技术标签:

【中文标题】使用队列的线程可重用性(线程之间的双向通信)【英文标题】:Thread reusability using a Queue (two way communication between threads) 【发布时间】:2021-11-10 05:09:44 【问题描述】:

我有一个控制器线程,它以 20 个任务的批量对任务进行排队,并且有 n 个任务。现在我有 20 个线程来执行每个任务,如果每个线程都完成了任务,我想再次重用它以从队列中获取任务。我是线程同步的新手,所以请原谅我。我无法重用线程。它正在执行 1 个批处理,即 20 个线程执行并完成。

Queue<Action> queueComputersToScan = new Queue<Action>();
ManualResetEvent manualResetEvent = new ManualResetEvent(false);
int batchsize = 20;
Thread controllerThread = new Thread(() =>

    for (int x = 0; x < Math.Ceiling((decimal)GetListOfComputers().Count / batchsize); x++)
    
        List<ComputerInformation> computers = new List<ComputerInformation>
            (GetListOfComputers().Skip(x * batchsize).Take(batchsize));

        foreach (var computer in computers)
        
            queueComputersToScan.Enqueue(() => ScanComputer(computer));
        

        // when queue will have 20 jobs this event signal child threads to work on it 
        manualResetEvent.Set();
    
);
controllerThread.Start();`

20 个工作线程的代码:

int maxThread = 20;
for (int i = 0; i < maxThread; i++)

    List<FileInformation> fileInformationPerComputer = new List<FileInformation>();
    string threadName = string.Empty;
    Thread thread = new Thread(() =>
    
        lock (syncLock)
        
            if (manualResetEvent.WaitOne())
            
                if (queueComputersToScan.Count > 0)
                
                    Action scanComputerJob = queueComputersToScan.Dequeue();
                    if (scanComputerJob != null)
                    
                        scanComputerJob();
                    
                
            
        
    );
    thread.Name = "Thread on" + threadName;
    thread.Start();
    Console.WriteLine(thread.Name.ToLower() + " has started");
    threadsPerComputer.Add(thread);

上述程序的输出是它从队列中获取 20 个作业,然后停止从队列中获取其他作业。这是一种通信方式,工作线程不再从队列中获取(我想修复)。

我对控制器线程和工作线程之间的双向通信或相互通信有点困惑。我想实现这个 控制器线程获取 20 个作业,然后控制器停止并向 20 个线程发出信号以处理 20 个作业。 当工人线程完成 20 个作业时,工人停止并向控制器线程发出信号以获取接下来的 20 个作业,控制器线程停止并再次向工人发出信号以再次处理 20 个作业,直到队列变空。

【问题讨论】:

TL;DR 但由于代码未使用 ConcurrentQueue,因此不太可能正确。 也许可以看看TPL dataflow 作为替代 @AlexeiLevenkov ya 在 2021 年,您通常不应该想“我将创建线程、锁和管理同步等”。您应该考虑“有哪些高级抽象可以让我处理我的问题而不是线程机制”。对于初学者来说,你的所有线程似乎只会按顺序运行,因为它们都在争夺锁并在其中完成所有工作。这是不对的。 @Damien_The_Unbeliever 一开始都是初学者 :) 【参考方案1】:

您应该使用 Microsoft 的响应式框架(又名 Rx)- NuGet System.Reactive 并添加 using System.Reactive.Linq; - 然后您可以这样做:

int batchsize = 20;

IObservable<Unit> query =
    GetListOfComputers()
        .ToObservable()
        .Select(computer => Observable.Start(() => ScanComputer(computer)))
        .Merge(batchsize);

IDisposable subscription = query.Subscribe();

就是这样。

【讨论】:

以上是关于使用队列的线程可重用性(线程之间的双向通信)的主要内容,如果未能解决你的问题,请参考以下文章

使用 QRunnable 进行线程处理 - 发送双向回调的正确方式

0515线程

Python collections系列之双向队列

线程间通信都有哪些方式

thread_Exchanger数据交换

12.2多线程通信:queue