通过c++11的condition_variable实现的有最大缓存限制的队列
Posted albizzia
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了通过c++11的condition_variable实现的有最大缓存限制的队列相关的知识,希望对你有一定的参考价值。
之前曾写过一个通过C++11的condition_variable实现的有最大缓存限制的队列,底层使用std::queue来实现,如果想要提升性能的话,可以考虑改用固定的长度环形数组。环形数组实现如下:
#include <cassert> #include <type_traits> #include <stdexcept> /* * 文件名: circle_buffer * 实现说明:底层使用数组来实现循环buffer * (1) 当m_begIdx和m_endIdx相同时,表示数组为空,否则标识数组存在值 * (2) 通过预先多分配一个节点的方式,来实现存储count个元素的目的 */ class empty_error : public std::logic_error { explicit empty_error(const std::string& what_arg) : logic_error(what_arg) {} explicit empty_error(const char* what_arg) : logic_error(what_arg) {} }; class full_error : public std::logic_error { explicit full_error(const std::string& what_arg) : logic_error(what_arg) {} explicit full_error(const char* what_arg) : logic_error(what_arg) {} }; template <typename T> class circle_buffer { public: using size_type = size_t; public: explicit circle_buffer(size_type count) : m_bufSize(count+1), m_buf(static_cast<T*>(std::malloc(sizeof(T)*m_bufSize))), m_begIdx(0), m_endIdx(0) { assert(count >= 1); if (m_buf == nullptr) { throw std::bad_alloc(); } } ~circle_buffer() { clear(typename std::is_trivially_destructible<T>::type()); } size_t size() const noexcept { if (m_endIdx < m_begIdx) { return m_endIdx + m_bufSize - m_begIdx; } return m_endIdx - m_begIdx; } bool empty() const noexcept { return m_begIdx == m_endIdx; } bool full() const noexcept { return ((m_endIdx+1) == m_begIdx) || (m_begIdx == 0 && m_endIdx == getMaxIdx()); } // buffer最后插入一个值,这里会检查是否存在空间,如果不存在,则抛出异常 template <typename... Args> void pushCheck(Args&&... args) { if (full()) { throw full_error("pushCheck invoked when buffer is full"); } push(std::forward<Args>(args)...); } // buffer最后插入一个值,这里不做检查是否存在空间 template <typename... Args> void push(Args&&... args) { new (&m_buf[m_endIdx]) T(std::forward<Args>(args)...); advanceIdx(m_endIdx); } // buffer最前面取出一个值,这里会检查是否存在元素可以取出,如果不存在,则抛出异常 T popCheck() { if (empty()) { throw empty_error("popCheck invoked when buffer is empty"); } return pop(); } // buffer最前面取出一个值 T pop() { auto val = std::move(m_buf[m_begIdx]); clearOne(typename std::is_trivially_destructible<T>::type()); advanceIdx(m_begIdx); return val; } private: // 将指示位置的序号前进一格 void advanceIdx(size_t& idx) noexcept { if (idx == getMaxIdx()) { idx = 0; } else { ++idx; } } // 非trivially析构函数类型 void clear(std::false_type) { while (m_begIdx != m_endIdx) { m_buf[m_begIdx].~T(); advanceIdx(m_begIdx); } std::free(m_buf); } // trivially析构函数类型 void clear(std::true_type) { std::free(m_buf); } // 非trivially析构函数类型 void clearOne(std::false_type) { m_buf[m_begIdx].~T(); } // trivially 析构函数类型 void clearOne(std::true_type) { } size_t getMaxIdx() const noexcept { return m_bufSize-1; } private: size_type m_bufSize; T* m_buf; size_type m_begIdx; size_type m_endIdx; };
关于上面的环形数组,简单的单元测试代码如下,这里使用了catch2,如下代码需要放在.cpp文件中。
#define CATCH_CONFIG_MAIN // This tells Catch to provide a main() - only do this in one cpp file #include "catch.hpp" #include "circle_buffer.h" TEST_CASE("circle buffer manipulation", "[circle_buffer]") { circle_buffer<int> cb(2); REQUIRE( cb.size() == 0 ); REQUIRE( cb.empty() == true); REQUIRE( cb.full() == false); cb.push(5); cb.push(6); REQUIRE( cb.size() == 2 ); REQUIRE( cb.empty() == false ); REQUIRE( cb.full() == true ); auto dropFirst = cb.pop(); REQUIRE( dropFirst == 5 ); REQUIRE( cb.size() == 1 ); REQUIRE( cb.empty() == false ); REQUIRE( cb.full() == false ); cb.push(9); REQUIRE( cb.size() == 2 ); REQUIRE( cb.empty() == false ); REQUIRE( cb.full() == true); auto dropSecond = cb.pop(); REQUIRE( dropSecond == 6 ); REQUIRE( cb.size() == 1 ); REQUIRE( cb.empty() == false ); REQUIRE( cb.full() == false ); auto dropThird = cb.pop(); REQUIRE( dropThird == 9 ); REQUIRE( cb.size() == 0 ); REQUIRE( cb.empty() == true ); REQUIRE( cb.full() == false ); }
下面是基于环形数组实现的有最大长度限制的生产者消费者队列,注意一点,在使用下面队列时,编译选项要加上-std=c++11。
#include <condition_variable> #include <chrono> #include "circle_buffer.h" template <typename T> class producer_consumer_queue { public: producer_consumer_queue(int maxSize): m_buffer(maxSize) { } // 处理数据线程 T readQueue() { T data; // 取出数据,然后处理数据 { std::unique_lock<std::mutex> lock(m_queueMtx); m_consumeCv.wait(lock, [this] { return !m_buffer.empty(); }); data = m_buffer.pop(); } m_produceCv.notify_one(); return data; } // 生产数据线程,返回值表示是否生产成功,如果超时就不会生产成功 template <typename Rep, typename Period, typename ...Args> bool writeQueue(const std::chrono::duration<Rep, Period>& wait_time, Args&& ...args) { // 预设一个消费者处理这个数据 { std::unique_lock<std::mutex> lock(m_queueMtx); auto success = m_produceCv.wait_for(lock, wait_time, [this] { return !m_buffer.full(); }); if (!success) { return false; } m_buffer.push(std::forward<Args>(args)...); } m_consumeCv.notify_one(); return true; } private: // 用来缓存数据 circle_buffer<T> m_buffer; // 用来保护数据 std::mutex m_queueMtx; // 用来提醒当前可以消费 std::condition_variable m_consumeCv; // 用来提醒当前可以生产 std::condition_variable m_produceCv; };
以上就是这个队列的具体实现。之后,考虑写一些关于中间件的知识,可能会从grpc开始吧。
以上是关于通过c++11的condition_variable实现的有最大缓存限制的队列的主要内容,如果未能解决你的问题,请参考以下文章
如何通过 C++11 的 CAS 实现 Valois 的队列
在 C++11 或以上,有没有办法通过 lambda 实现单方法纯虚拟 C++ 接口?