线程安全的缓冲区数组

Posted

技术标签:

【中文标题】线程安全的缓冲区数组【英文标题】:Thread safe array of buffers 【发布时间】:2016-09-27 12:20:49 【问题描述】:

我目前正在重构我在 nvidia 硬件编码器中找到的一些代码,用于压缩视频图像。原来的问题在这里:wondering if I can use stl smart pointers for this

根据答案,我将代码更新如下:

根据答案和 cmets,我尝试制作一个。这里是。请发表评论。

#ifndef __BUFFER_ARRAY_H__
#define __BUFFER_ARRAY_H__

#include <vector>
#include <mutex>
#include <thread>

template<class T>
class BufferArray

public:
    class BufferArray()
        :num_pending_items(0), pending_index(0), available_index(0)
    

    // This method is not thread-safe. 
    // Add an item to our buffer list
    // Note we do not take ownership of the incoming pointer.
    void add(T * buffer)
    
        buffer_array.push_back(buffer);
    

    // Returns a naked pointer to an available buffer. Should not be
    // deleted by the caller. 
    T * get_available()
    
        std::lock_guard<std::mutex> lock(buffer_array_mutex);
        if (num_pending_items == buffer_array.size()) 
            return NULL;
               
        T * buffer = buffer_array[available_index];
        // Update the indexes.
        available_index = (available_index + 1) % buffer_array.size();
        num_pending_items += 1;
        return buffer;
    

    T * get_pending()
    
        std::lock_guard<std::mutex> lock(buffer_array_mutex);
        if (num_pending_items == 0) 
            return NULL;
        

        T * buffer = buffer_array[pending_index];
        pending_index = (pending_index + 1) % buffer_array.size();
        num_pending_items -= 1;
        return buffer;
    


private:
    std::vector<T * >                   buffer_array;
    std::mutex                          buffer_array_mutex;
    unsigned int                        num_pending_items;
    unsigned int                        pending_index;
    unsigned int                        available_index;

    // No copy semantics
    BufferArray(const BufferArray &) = delete;
    void operator=(const BufferArray &) = delete;
;

#endif

我的问题是我是否在这里违反了一些 C++ 良好实践建议?此外,我正在扩展该类,以便可以访问和使用我的多个线程。我想知道是否有什么我可能错过的。

【问题讨论】:

您的add() 不是线程安全的。它还需要一个锁防护装置。否则,问题就太模糊不清了。 与您的问题无关,但以双下划线或下划线后跟大写字母开头的符号在所有范围内都保留。请参阅this old answer 了解更多信息。 @Luca 在规则中包含守卫计数。 您的评论说您不拥有传入指针的所有权,但函数本身确实拥有它。 buffer_array 应该包含唯一指针还是原始指针? 为什么不使用lock free queue之类的东西?您可以将所有作业推送到队列中,然后工作线程可以根据需要不断弹出它们。如果您需要在完成后将作业放回原处,那么工作线程可以将其推回队列的后面。 【参考方案1】:

我想我会这样处理它:

在这个测试中,“处理”只是将一个 int 乘以 2。但请注意处理器线程如何从待处理队列中取出待处理数据,对其进行处理,然后将可用数据推送到可用队列。然后它发出信号(通过条件变量)消费者(在这种情况下,您的磁盘写入器)应该再次查找可用数据。

#include <vector>
#include <mutex>
#include <thread>
#include <queue>
#include <condition_variable>
#include <iostream>

namespace notstd 
    template<class Mutex> auto getlock(Mutex& m)
    
        return std::unique_lock<Mutex>(m);
    


template<class T>
class ProcessQueue

public:
    ProcessQueue()
    

    // This method is not thread-safe.
    // Add an item to our buffer list
    // Note we do not take ownership of the incoming pointer.
    // @pre start_processing shall not have been called
    void add(T * buffer)
    
        pending_.push(buffer);
    

    void start_processing()
    
        process_thread_ = std::thread([this] 
            while(not this->pending_.empty())
            
                auto lock = notstd::getlock(this->mutex_);
                auto buf = this->pending_.front();
                lock.unlock();

                //
                // this is the part that processes the "buffer"

                *buf *= 2;

                //
                // now notify the structure that the processing is done - buffer is available
                //

                lock.lock();
                this->pending_.pop();
                this->available_.push(buf);
                lock.unlock();
                this->change_.notify_one();
            
        );
    

    T* wait_available()
    
        auto lock = notstd::getlock(mutex_);
        change_.wait(lock, [this]  return not this->available_.empty() or this->pending_.empty(); );
        if (not available_.empty())
        
            auto p = available_.front();
            available_.pop();
            return p;
        

        lock.unlock();
        process_thread_.join();
        return nullptr;
    

private:
    std::queue<T * >                   pending_;
    std::queue<T * >                   available_;
    std::mutex                          mutex_;
    std::condition_variable             change_;
    std::thread                     process_thread_;

    // No copy semantics - implicit because of the mutex
;

int main()

    ProcessQueue<int> pq;

    std::vector<int> v =  1, 2, 3, 4, 5, 6, 7, 8, 9 ;
    for (auto& i : v) 
        pq.add(std::addressof(i));
    

    pq.start_processing();

    while (auto p = pq.wait_available())
    
        std::cout << *p << '\n';
    

预期输出:

2
4
6
8
10
12
14
16
18

【讨论】:

以上是关于线程安全的缓冲区数组的主要内容,如果未能解决你的问题,请参考以下文章

线程安全 FIFO 队列/缓冲区

将缓冲区写入 Java 通道:线程安全与否?

线程安全的生产者消费者四种实现方法

hashMap线程不安全的原因及表现

使用无锁指针队列在线程之间移动数据是不是安全

将互斥保护构建到 C++ 类中的线程安全方法?