循环中的线程池队列用户工作项 |拆分为子线程

Posted

技术标签:

【中文标题】循环中的线程池队列用户工作项 |拆分为子线程【英文标题】:ThreadPool QueueUserWorkItem in loop | Splitting to Child threads 【发布时间】:2020-10-16 04:28:15 【问题描述】:

我有一个主线程,它从我的数据库收集活动作业以进行处理。 我想将此作业拆分为多线程,而不会出现任何崩溃或高 CPU 问题。

所以我选择了使用线程池的方式。我的主线程被另一个线程调用,工作原理如下:

    private void startChipDataManagment()
    
        fillQueu();
        ThreadPool.SetMinThreads(_threadCount, _threadCount);
        ThreadPool.SetMaxThreads(_threadCount, _threadCount);
        do
        
            for (int i = 0; i < MainJobsQueuList.Count; i++)
            
                job CurrentJob = MainJobsQueuList[i];
                if (isSystemAlive())
                
                    JobDataGenerationThread singleJobThread = new JobDataGenerationThread(CurrentJob.id, this);
                    ThreadPool.QueueUserWorkItem(new WaitCallback(singleJobThread.processJob), singleJobThread);
                    Thread.Sleep(0);
                
            

            while (_jobQueueCount > 0)
            
                Thread.Sleep(10);
            
            if (MainJobsQueuList.Count > 0)
            
                Thread.Sleep(1);
                MainJobsQueuList.Clear();
                _jobQueueCount = 0;
            
            Thread.Sleep(FILE_PROCESSED_AFTER_WAIT_TIME);
            fillQueu();
            
         while (true);
     

    private object syncLock = new object();
    public object syncLockChild = new object();
    int _jobQueueCount = 0;

    public void increaseThreadCount()
    
        lock (syncLock)
        
            _jobQueueCount++;
        
        Console.WriteLine("Count increased. Current Count : 0", _jobQueueCount);
    
    public void decreaseThreadCount()
    
        lock (syncLock)
        
            _jobQueueCount--;
        
        Console.WriteLine("Count decreased. Current Count : 0", _jobQueueCount);
    

我的 SingleJobThread 类如下:

    public JobDataGenerationThread(long assignedJobId, DataGenerationManagerThreadPoolNew mainThreadPool)
    
        dbContextThread = new emvEntities();
        //currentJob = assignedJob;
        currentJobId = assignedJobId;
        CurrentAppSettings = new Settings();
        CurrentAppSettings.ReadSettings();

        SettingsManager SM = new SettingsManager();
        Deskey = SM.GetDesKey();

        _mainThreadPool = mainThreadPool;
    

    public void processJob(object obj)
    
        TimeSpan stTime = DateTime.Now.TimeOfDay;
        job SingleJob = /*currentJob;*/ GetSelectedJob(currentJobId);
        try
                        
            _mainThreadPool.increaseThreadCount();
            /*Do some work.. Change flags of some Jobs*/
        
        catch (Exception ext) 
            LogGenerator.WriteLog(NLog.LogLevel.Error, 1, LogGenerator.ModuleType.DataEngine, "I see Error : " + ext.ToString() + "MY JOB ID :" + currentJobId, true);
        
        finally
        
            try
            
                SingleJob.isDataProcessingStarted = true;
                SingleJob.creationdate = DateTime.Now;
                dbContextThread.SaveChanges();
                _mainThreadPool.decreaseThreadCount();
                TimeSpan etTime = DateTime.Now.TimeOfDay;
                LogGenerator.WriteLog(NLog.LogLevel.Trace, 1, LogGenerator.ModuleType.DataEngine, string.Format("I have Taken total: 0 seconds. My JOB ID is : 1", etTime - stTime, currentJobId), true);
                //Console.WriteLine("I have taken total : 0 seconds", etTime - stTime);
            
            catch (Exception excpt)
            
                LogGenerator.WriteLog(NLog.LogLevel.Error, 1, LogGenerator.ModuleType.DataEngine, string.Format("!!!!!!!!!!!!!!!!!FATAL ERROR!!!!!!!!!!!!!!!!! \r\n0", excpt.ToString()), true, excpt);
                Console.WriteLine("!!!!!!!!!!!!!!!!!FATAL ERROR!!!!!!!!!!!!!!!!! 0\r\n", excpt.ToString());
            
        

但是我实现了这个逻辑,我不相信我为 ThreadPool 设置的 ThreadCount 是正确的,而且我也有一些问题,比如在循环中创建 JobDataGenerationThread 对象并在 ThreadPool 中调用它的函数是否可以?我的项目绩效没有增加。我错过了什么?

我将不胜感激任何输入.. 谢谢

编辑:

感谢 David-Browne 的回答,我更新了如下代码并删除了垃圾代码。所以这似乎比我之前的代码更安全。

            Parallel.ForEach(MainJobsQueuList, opt, job =>
            
                if (isSystemAlive())
                
                    JobDataGenerationThread singleJobThread = new JobDataGenerationThread(job.id, this);
                    singleJobThread.processJob();
                
            );

我删除了获取对象的 processJob 函数。 然而,我付出了所有努力,我发现性能问题是因为我调用的另一个应用程序在支持多线程进程方面存在一些限制..

【问题讨论】:

一个parallel for loop怎么样? 你好@Wyck你能给出更多解释吗?您希望我在哪里考虑并行 for 循环? 设置ThreadPool.SetMaxThreads 是灾难的根源,尤其是低值。您的程序可能会在任何随机时刻因无法捕获的异常而崩溃。为什么要做这样的事情? @TheodorZoulias 我想根据服务器功能设置线程数。所以我搜索并看到人们正在像这样设置池的线程数。如果有更好的意见,请建议我 如果你有一个硬性要求不要使用超过 X 个 ThreadPool 线程,并且满足这个要求比你的应用程序继续运行更重要,那么一定要使用 ThreadPool.SetMaxThreads 方法。这听起来对我来说是一个奇怪的要求,但你可能有你的理由,这些理由可能是有效的。 【参考方案1】:

并行循环就像这样:

var opt = new ParallelOptions()  MaxDegreeOfParallelism = 4 ;
Parallel.ForEach(MainJobsQueuList, opt, job => ProcessJob(job));

几乎替换所有代码。

【讨论】:

感谢您的建议。但实际上我的主线程并行调用了 fillQueue 函数。这意味着我不会等待在我的 for 循环中处理 singleJob。它只是将作业拆分为线程。我还需要定义对象“JobDataGenerationThread”来传递一些变量。你认为这个可以适合我的代码。提前致谢 尝试使用 BlockingCollection 而不是 List。然后你可以在你添加项目时读者会阻止。 我已经阻止在列表中添加一些条件,例如我上面的代码中的一些条件,所以除非过程完成,否则没有人会添加到列表中

以上是关于循环中的线程池队列用户工作项 |拆分为子线程的主要内容,如果未能解决你的问题,请参考以下文章

C++中的线程池/队列系统

(78) 线程池 / 计算机程序的思维逻辑

计算机程序的思维逻辑 (78) - 线程池

JAVA线程池

ThreadPoolExecutor 线程池理论饱和策略工作队列排队策略

Java中的线程池——ThreadPoolExecutor的原理