我应该如何改进线程池以使其更安全?

Posted

技术标签:

【中文标题】我应该如何改进线程池以使其更安全?【英文标题】:How should I improve a thread pool to make it more thread safe? 【发布时间】:2019-08-13 19:52:57 【问题描述】:

我目前正在学习有关线程池的基础知识。以下是我根据网上找到的一些示例编写的一些代码块:

SyncQueue.h

#ifndef SYNC_QUEUE_H
#define SYNC_QUEUE_H

#include <list>
#include <mutex>
#include <iostream>

template<typename T>
class SyncQueue 
public:
  SyncQueue();
  ~SyncQueue();
  SyncQueue(const SyncQueue&) = delete;
  SyncQueue& operator=(const SyncQueue &) = delete;
  void append(const T& data);
  T& get();
  unsigned long size();
  bool empty();
private:
  std::list<T> queue;
  std::mutex myMutex;
;
#endif

SyncQueue.cpp

#include "SyncQueue.h"

template<typename T>
SyncQueue<T>::SyncQueue():
  queue(),
  myMutex() 

template<typename T>
SyncQueue<T>::~SyncQueue() 

template<typename T>
void SyncQueue<T>::append(const T& data) 
  std::unique_lock<std::mutex> l(myMutex);
  queue.push_back(data);


template<typename T>
T& SyncQueue<T>::get() 
  std::unique_lock<std::mutex> l(myMutex);
  T& res = queue.front();
  queue.pop_front();
  return res;


template<typename T>
unsigned long SyncQueue<T>::size() 
  std::unique_lock<std::mutex> l(myMutex);
  return queue.size();


template<typename T>
bool SyncQueue<T>::empty() 
  std::unique_lock<std::mutex> l(myMutex);
  return queue.empty();


template class SyncQueue<std::function<void()>>;

线程池.h

#ifndef THREAD_POOL_H
#define THREAD_POOL_H

#include <atomic>
#include <functional>
#include <mutex>
#include <thread>
#include <vector>
#include "SyncQueue.h"

class ThreadPool 
public:
  ThreadPool(unsigned long thrdAmount = 0);
  virtual ~ThreadPool();
  void appendTask(std::function<void()> func);
  unsigned long pendingTasks();
private:
  void runThread();
  unsigned int myThrdAmount;
  std::atomic<bool> done;
  SyncQueue<std::function<void()>> syncQueue;
  std::vector<std::thread> threads;
  std::condition_variable myCondVar;
  std::mutex myMutex;
;

#endif

线程池.cpp

#include "ThreadPool.h"

ThreadPool::ThreadPool(unsigned long thrdAmount):
  myThrdAmount(0),
  done(false),
  syncQueue(),
  threads(),
  myCondVar(),
  myMutex() 
  if (thrdAmount > 0) 
    myThrdAmount = thrdAmount;
   else 
    myThrdAmount = std::thread::hardware_concurrency();
  
  for (unsigned int i = 0; i < myThrdAmount; i++) 
    threads.push_back(std::thread(&ThreadPool::runThread, this));
  


ThreadPool::~ThreadPool() 
  done = true;
  myCondVar.notify_all();
  for (auto& thrd: threads) 
    if (thrd.joinable()) 
      thrd.join();
    
  


void ThreadPool::appendTask(std::function<void()> func) 
  syncQueue.append(func);
  
    std::unique_lock<std::mutex> l(myMutex);
    myCondVar.notify_one();
  


unsigned long ThreadPool::pendingTasks() 
  return syncQueue.size();


void ThreadPool::runThread() 
  while (!done) 
    if (syncQueue.empty()) 
      std::unique_lock<std::mutex> l(myMutex);
      myCondVar.wait(l);
      continue;
    
    syncQueue.get()();
  

main.cpp

#include <unistd.h>
#include <iostream>
#include "ThreadPool.h"

void print() 
  std::cout << "Hello World!" << std::endl;


int main(int argc, char const *argv[]) 
  ThreadPool p;
  for (int i = 0; i < 20; i++) 
    p.appendTask(print);
  
  std::cout << "Pending: " << p.pendingTasks() << std::endl;
  sleep(5);
  for (int i = 0; i < 20; i++) 
    p.appendTask(print);
  
  return 0;

尽管SyncQueue 上的所有操作都被互斥锁锁定,并且ThreadPool 的条件变量也受互斥锁保护,但代码通常会导致未定义的行为。

也就是说,你能解释一下代码缺乏线程安全的地方吗?我应该如何改进它?

【问题讨论】:

注意:您不使用std::unique_lock 的任何额外功能,因此您可以使用std::lock_guard 您的get 函数返回对已破坏项目的引用。那永远不会有好的结局。 注意:template&lt;typename T&gt; SyncQueue&lt;T&gt;::~SyncQueue() 是一个用户定义的析构函数,即使是空的,根据定义,它也不再是一个 trivial 析构函数。将它声明为= default; 在标题中 代替(并删除实现)它是微不足道的。您的默认构造函数也是如此。 注意:你的sizeemptypendingTasks函数应该是const 【参考方案1】:
 void ThreadPool::appendTask(std::function<void()> func) 
  syncQueue.append(func);
  
    std::unique_lock<std::mutex> l(myMutex);
    myCondVar.notify_one();
  


void ThreadPool::runThread() 
  while (!done) 
    if (syncQueue.empty()) 
      std::unique_lock<std::mutex> l(myMutex);
      myCondVar.wait(l);
      continue;
    
    syncQueue.get()();
  

问题是myMutex 实际上并没有保护任何东西。因此,您的代码在等待队列时遇到了灾难性的竞争条件。

考虑:

    调用runThread 的线程发现syncQueue 为空。 线程调用appendTask 将作业添加到队列并调用notify_one。没有要通知的线程。 调用runThread 的线程最终获得myMutex 的锁定并等待条件变量,但队列不为空。

绝对重要的是,您用于等待的条件变量与保护您正在等待的谓词的互斥锁相关联。条件变量的全部目的是允许您以原子方式解锁谓词并等待没有竞争条件的信号。但是您将谓词隐藏在syncQueue 中,从而破坏了条件变量的锁定处理逻辑。

您可以通过在myMutex 互斥锁的保护下对syncQueue 进行所有调用来修复此竞争条件。但是让syncQueue 等待可能更有意义。不过,这可能会使关闭线程池变得更加困难。

【讨论】:

to make syncQueue waitable 是什么意思?您的意思是在SyncQueue 类中使用条件变量作为成员会更好吗? @rudicangiotti 是的。您的代码需要一个可等待队列,即允许您等待将事物放入其上的队列。 使用多个互斥体怎么样?他们应该允许独立地附加和获取任务,而不中断非原子操作,不是吗? Here 是一个小代码草案。正如您所介绍的,在这种情况下,关闭会更加棘手。 @rudicangiotti 看起来更好。只需修复 if (queue.empty()) 中的 get。你需要一个while。考虑:1)两个线程正在等待作业。 2) 两个作业被放入队列。 3) 两个线程都被唤醒。 4)第一个线程被调度,完成这两项工作。 5)第二个线程终于开始运行,尝试从一个空队列中获取。 5) 繁荣 @rudicangiotti 您可能会发现 this answer 很有帮助。

以上是关于我应该如何改进线程池以使其更安全?的主要内容,如果未能解决你的问题,请参考以下文章

设置 Spring Boot 的定时任务线程池以优雅退出

设置 Spring Boot 的定时任务线程池以优雅退出

重写以下 SQL 查询以使其更高效/改进其执行及其原因

我应该合并事实表的列以使其更窄,还是应该通过大量列使其对用户更友好?

异步多线程----执行器(Executor)

如何在线程池队列上观察我的快速源