并发编程多线程程序同步策略

Posted bigosprite

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了并发编程多线程程序同步策略相关的知识,希望对你有一定的参考价值。

C++11线程使用初探

std::thread

#include <thread>

只读的共享数据在多个线程间不存在Race condition的危险,而可读可写共享数据在线程间共享时则需做好线程同步,即数据保护,主要包括lock-based和lock-free策略。

常见的以互斥锁保护多线程间的共享数据,保证某一时刻仅有一个线程访问共享数据,导致线程间数据保护是串行,因此在多线程环境中,锁保护的区域越小,并发程度越高。

采用条件变量等待某个事件或条件发生

C++11提供了两种条件变量:std::condition_variable和std::condition_variable_any,均需要和互斥量一起使用来保证操作的同步性。前者仅能与std::mutex一起使用,后者可与所有mutex-like的锁一起使用,更加通用,但以牺牲空间、性能或操作系统资源为代价,因此std::condition_variable是首选;它们均在头文件中定义。

生产者-消费者模式在并发编程中应用广泛,有助于系统的解耦。队列是一种常见的在生产者和消费者线程间传递数据的容器,队列先入先出的特性满足应用对顺序性的要求。

人脸分析组件采用队列传递数据,通过在生产者线程中调用InputData函数将待分析数据包送入队列,并调用notify_one通知消费者线程,从而消费者线程进行人脸数据分析。伪代码如下:

std::mutex mut;
std::condition_variable cond;
std::queue<data_chunk> data_queue;

// 将待分析数据送入队列
int InputData(const data_chunk& data)

  if (invalid_data)
  
    LOG_ERROR("invalid param!");
    return ERROR_CODE;
  
  // 将数据送入队列
  // 并发出通知
  std::lock_guard<std::mutex> lk(mut);
  data_queue.push(data);
  cond.notify_one();
  return SUCCESS_CODE;


// InputData在生产者线程中被调用,将待分析数据送入队列
// 消费线程函数Process从队列取数据以进行分析
// 线程在条件未发生时因wait函数阻塞进入睡眠状态
void Process()

  while (not_exit_expression)
  
    // 使用unique_lock而非lock_guard
    std::unique_lock<std::mutex> lk(mut);

    // wait函数在条件满足时返回,否则线程进入阻塞状态
    //
    // 若lambda表达式返回false(即不条件满足),则wait释放lk中锁资源,
    // 且线程进入阻塞状态,以便生线程可以继续获取锁并送数据到队里;否则,
    //
    // 当notify_one通知条件变量时,消费消除从睡眠状态苏醒,重新获取锁,且对条件再次检查,lambda表达式返回true
    // 此时,wait返回并继续持有锁资源,然后继续往下执行
    cond.wait(lk, []() return !data_queue.empty(); );

    data_chunk data = data_queue.front();
    data_queue.pop();

    // wait返回后持有锁,因此需要在此处解锁
    lk.unlock();

    // 处理数据
    process_the_data;
  

?? 若将Process循环中阻塞逻辑(wait函数)换为非阻塞模式逻辑——即当队列为空时,循环continue,将如何影响Process线程?

以上代码,消费者线程使用std::unique_lock而非std::lock_guard是因为前者较后者灵活,std::lock_guard未实现lock/unlock成员函数。另外,队列在线程间传递数据应用广泛,将非线程安全的队列和条件变量封装到具体场景类,不仅编码重复,而且低效易出错。因此,实现线程安全的队列是十分必要的。

线程安全的队列适配器

上小节已介绍了“队列,条件变量和互斥量”的应用场景,现将它们封装为线程安全的队列适配器——CTSQueue。

线程安全的队列 - 需求分析:

  • 仍具备队列的先入先出特性;
  • 支持数据队尾入队列、队首出队列操作;
  • 提供阻塞和非阻塞版本出队列操作;
  • 泛型队列;
  • 使用C++11容器及线程同步原语,当然也可使用其他如posix提供的同步原语;

完整代码如下:

#include <queue>              // for queue
#include <memory>             // for std::shared_ptr
#include <mutex>              // for std::mutex
#include <condition_variable> // for std::condition_variable

// 线程安全的队列
template<typename T>
class CTSQueue

public:
  CTSQueue() = default;
  ~CTSQueue()
   

  // 不提供复制操作 - 禁用
  CTSQueue(const CTSQueue&) = delete;
  CTSQueue& operator=(const CTSQueue&) = delete;

  // 入队列操作
  void Push(T data)
  
    std::lock_guard<std::mutex> lk(mut_);
    data_queue_.push(data);
    cond_.notify_one();
  

  // 出队列操作 - 非阻塞版本
  bool TryPop(T& data)
  
    std::lock_guard<std::mutex> lk(mut_);
    // 空队列立马返回出队列失败,否则
    // 数据出队列(拷贝版本)
    if (data_queue_.empty())
    
      return false;
    
    data = data_queue_.front();
    data_queue_.pop();
    return true;
  
  std::shared_ptr<T> TryPop()
  
    std::lock_guard<std::mutex> lk(mut_);
    if (data_queue_.empty())
    
      return std::shared_ptr<T>();
    
    // 出队列(非拷贝版本)
    std::shared_ptr<T> res(std::make_shared<T>(data_queue_.front()));
    data_queue_.pop();
    return res;
  

  // 出队列操作 - 阻塞版本
  void WaitPop(T& data)
  
    std::lock_guard<std::mutex> lk(mut_);
    // 条件未发生时,调用线程阻塞
    // 否则,数据出队列(拷贝版本)
    cond_.wait(lk, [this]() return !data_queue_.empty() );
    data = data_queue_.front();
    data_queue_.pop();
  
  std::shared_ptr<T> WaitPop()
  
    std::lock_guard<std::mutex> lk(mut_);
    cond_.wait(lk, [this]() return !data_queue_.empty() );
    // 出队列(非拷贝版本)
    std::shared_ptr<T> res(std::make_shared<T>(data_queue_.front()));
    data_queue_.pop();
    return res;
  

  // 查询队列状态操作
  bool Empty() const
  
    std::lock_guard<std::mutex> lk(mut_);
    return data_queue_.empty();
  

private:
  mutable std::mutex      mut_;
  std::queue<T>           data_queue_;
  std::condition_variable cond_;
;

参考文献:
[1] 《C++ Concurrency In Action》, https://www.bogotobogo.com/cplusplus/files/CplusplusConcurrencyInAction_PracticalMultithreading.pdf

以上是关于并发编程多线程程序同步策略的主要内容,如果未能解决你的问题,请参考以下文章

并发编程-线程安全策略之两种类型的同步容器

多线程编程-之并发编程:同步容器

并发编程-线程安全策略之并发容器(J.U.C)中的集合类

并发编程-线程安全策略之不可变对象

Java并发多线程编程——线程池

java并发常识