通过c++11的condition_variable实现的有最大缓存限制的队列

Posted albizzia

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了通过c++11的condition_variable实现的有最大缓存限制的队列相关的知识,希望对你有一定的参考价值。

之前曾写过一个通过C++11的condition_variable实现的有最大缓存限制的队列,底层使用std::queue来实现,如果想要提升性能的话,可以考虑改用固定的长度环形数组。环形数组实现如下:

#include <cassert>
#include <type_traits>
#include <stdexcept>

/*
 * 文件名: circle_buffer
 * 实现说明:底层使用数组来实现循环buffer
 * (1) 当m_begIdx和m_endIdx相同时,表示数组为空,否则标识数组存在值
 * (2) 通过预先多分配一个节点的方式,来实现存储count个元素的目的
 */

class empty_error : public std::logic_error {
    explicit empty_error(const std::string& what_arg)
        : logic_error(what_arg)
    {}

    explicit empty_error(const char* what_arg)
        : logic_error(what_arg)
    {}

};

class full_error : public std::logic_error {
    explicit full_error(const std::string& what_arg)
       : logic_error(what_arg)
    {}

    explicit full_error(const char* what_arg)
        : logic_error(what_arg)
    {}

};

template <typename T>
class circle_buffer {
public:
    using size_type = size_t;
public:
    explicit circle_buffer(size_type count)
    : m_bufSize(count+1),
      m_buf(static_cast<T*>(std::malloc(sizeof(T)*m_bufSize))),
      m_begIdx(0),
      m_endIdx(0)
    {
        assert(count >= 1);
        if (m_buf == nullptr) {
            throw std::bad_alloc();
        }
    }

    ~circle_buffer() {
        clear(typename std::is_trivially_destructible<T>::type());
    }

    size_t size() const noexcept {
        if (m_endIdx < m_begIdx) {
            return m_endIdx + m_bufSize - m_begIdx;
        } 
        return m_endIdx - m_begIdx;
    }

    bool empty() const noexcept {
        return m_begIdx == m_endIdx;
    }

    bool full() const noexcept {
        return ((m_endIdx+1) == m_begIdx) || 
            (m_begIdx == 0 && m_endIdx == getMaxIdx());
    }

    // buffer最后插入一个值,这里会检查是否存在空间,如果不存在,则抛出异常
    template <typename... Args>
    void pushCheck(Args&&... args) {
        if (full()) {
            throw full_error("pushCheck invoked when buffer is full");
        } 

        push(std::forward<Args>(args)...);
    }
    
    // buffer最后插入一个值,这里不做检查是否存在空间
    template <typename... Args>
    void push(Args&&... args) {
        new (&m_buf[m_endIdx]) T(std::forward<Args>(args)...);
        advanceIdx(m_endIdx);
    }

    // buffer最前面取出一个值,这里会检查是否存在元素可以取出,如果不存在,则抛出异常
    T popCheck() {
        if (empty()) {
            throw empty_error("popCheck invoked when buffer is empty");
        }

        return pop();
    }

    // buffer最前面取出一个值
    T pop() {
        auto val = std::move(m_buf[m_begIdx]);
        clearOne(typename std::is_trivially_destructible<T>::type());
        advanceIdx(m_begIdx);
        return val;
    }


private:
    // 将指示位置的序号前进一格
    void advanceIdx(size_t& idx) noexcept {
        if (idx == getMaxIdx()) {
            idx = 0;
        } else {
            ++idx;
        }
    }

    // 非trivially析构函数类型
    void clear(std::false_type) {
        while (m_begIdx != m_endIdx) {
            m_buf[m_begIdx].~T();
            advanceIdx(m_begIdx);
        }
        std::free(m_buf);
    }

    // trivially析构函数类型
    void clear(std::true_type) {
        std::free(m_buf);
    }

    // 非trivially析构函数类型
    void clearOne(std::false_type) {
        m_buf[m_begIdx].~T();
    }

    // trivially 析构函数类型
    void clearOne(std::true_type) {
    }

    size_t getMaxIdx() const noexcept {
        return m_bufSize-1;
    }

private:
    size_type m_bufSize;
    T* m_buf;
    size_type m_begIdx;
    size_type m_endIdx;
};

关于上面的环形数组,简单的单元测试代码如下,这里使用了catch2,如下代码需要放在.cpp文件中。

#define CATCH_CONFIG_MAIN       
// This tells Catch to provide a main() - only do this in one cpp file
#include "catch.hpp"
#include "circle_buffer.h"

TEST_CASE("circle buffer manipulation", "[circle_buffer]") {
    circle_buffer<int> cb(2);
    
    REQUIRE( cb.size() == 0 );
    REQUIRE( cb.empty() == true);
    REQUIRE( cb.full() == false);

    cb.push(5);
    cb.push(6);

    REQUIRE( cb.size() == 2 );
    REQUIRE( cb.empty() == false );
    REQUIRE( cb.full() == true );

    auto dropFirst = cb.pop();
    
    REQUIRE( dropFirst == 5 );
    REQUIRE( cb.size() == 1 );
    REQUIRE( cb.empty() == false );
    REQUIRE( cb.full() == false );

    cb.push(9);

    REQUIRE( cb.size() == 2 );
    REQUIRE( cb.empty() == false );
    REQUIRE( cb.full() == true);

    auto dropSecond = cb.pop();

    REQUIRE( dropSecond == 6 );
    REQUIRE( cb.size() == 1 );
    REQUIRE( cb.empty() == false );
    REQUIRE( cb.full() == false );

    auto dropThird = cb.pop();

    REQUIRE( dropThird == 9 );
    REQUIRE( cb.size() == 0 );
    REQUIRE( cb.empty() == true );
    REQUIRE( cb.full() == false );
}

下面是基于环形数组实现的有最大长度限制的生产者消费者队列,注意一点,在使用下面队列时,编译选项要加上-std=c++11。

#include <condition_variable>
#include <chrono>
#include "circle_buffer.h"

template <typename T>
class producer_consumer_queue {
public:
    producer_consumer_queue(int maxSize): m_buffer(maxSize) { }

    // 处理数据线程
    T readQueue() {
        T data;
        // 取出数据,然后处理数据
        {
            std::unique_lock<std::mutex> lock(m_queueMtx);
            m_consumeCv.wait(lock, [this] { return !m_buffer.empty(); });

            data = m_buffer.pop();
        }
        m_produceCv.notify_one();

        return data;
    }

    // 生产数据线程,返回值表示是否生产成功,如果超时就不会生产成功
    template <typename Rep, typename Period, typename ...Args>
    bool writeQueue(const std::chrono::duration<Rep, Period>& wait_time, Args&& ...args) {
        // 预设一个消费者处理这个数据
        {
            std::unique_lock<std::mutex> lock(m_queueMtx);
            auto success = m_produceCv.wait_for(lock, wait_time, [this] { return !m_buffer.full(); });
            if (!success) {
                return false;
            }
            m_buffer.push(std::forward<Args>(args)...);
        }
        m_consumeCv.notify_one();
        return true;
    }

private:
    // 用来缓存数据
    circle_buffer<T> m_buffer;
    // 用来保护数据
    std::mutex m_queueMtx;
    // 用来提醒当前可以消费
    std::condition_variable m_consumeCv;
    // 用来提醒当前可以生产
    std::condition_variable m_produceCv;
};

以上就是这个队列的具体实现。之后,考虑写一些关于中间件的知识,可能会从grpc开始吧。

 

以上是关于通过c++11的condition_variable实现的有最大缓存限制的队列的主要内容,如果未能解决你的问题,请参考以下文章

如何通过 C++11 的 CAS 实现 Valois 的队列

通过共享内存和管道的 IPC 给出分段错误:C 中的 11

在 C++11 或以上,有没有办法通过 lambda 实现单方法纯虚拟 C++ 接口?

c_cpp 使用C ++ 11模板模板参数通过链接继承添加伪方面

C++11多线程笔记

通过c++11的condition_variable实现的有最大缓存限制的队列