实现有界缓冲区(读取器和写入器之间的非阻塞,读取器之间的阻塞,写入器之间的阻塞)
Posted
技术标签:
【中文标题】实现有界缓冲区(读取器和写入器之间的非阻塞,读取器之间的阻塞,写入器之间的阻塞)【英文标题】:implement bounded buffer (non-block between reader and writer, block among readers, block among writers) 【发布时间】:2016-09-13 21:52:14 【问题描述】:实现有界队列
阅读: 如果队列为空,则等待它可以返回一个超时值 如果另一个线程正在从队列中读取,则等待该线程完成 从队列中移除第一个元素并返回它 如果线程正在写入队列,请不要阻塞
写: 如果队列已满,请等待读取一个值并超时 如果另一个线程正在写入队列,请等待该线程完成 将元素写入队列末尾 如果线程正在从队列中读取,请不要阻塞
我不确定我的实现是否正确
using namespace std;
template <typename T, int N>
class BoundedBuffer
private:
std::array<T, N> buffer;
int read_pos;
int write_pos;
std::mutex reader_mutex; //mutex for between readers
std::mutex writer_mutex; //mutex for between writers
std::mutex shared_mutex;
std::condition_variable reader_queue;
std::condition_variable writer_queue;
int timeout; //timeout in millisecond
public:
BoundedBuffer(const BoundedBuffer&) = delete;
BoundedBuffer& operator=(const BoundedBuffer&) = delete;
BoundedBuffer(int t) :
read_pos(0),
write_pos(0),
timeout(t)
inline bool empty()
return read_pos == write_pos;
inline bool full()
return write_pos >= read_pos + N;
bool put(const T& data)
unique_lock<mutex> writer_lock(writer_mutex);
unique_lock<mutex> shared_lock(shared_mutex);
if (full()) //buffer full
if (writer_queue.wait_for(shared_lock, std::chrono::milliseconds(timeout)) ==
std::cv_status::timeout)
return false;
buffer[write_pos%N] = data;
write_pos++;
reader_queue.notify_one();
return true;
pair<T, bool> get()
unique_lock<mutex> reader_lock(reader_mutex);
unique_lock<mutex> shared_lock(shared_mutex);
if (empty()) //buffer empty
if (reader_queue.wait_for(shared_lock, std::chrono::milliseconds(timeout)) ==
std::cv_status::timeout)
T t;
return make_pair(t, false);
pair<T, bool> result = make_pair(buffer[read_pos%N], true);
read_pos++;
writer_queue.notify_one();
return result;
;
【问题讨论】:
【参考方案1】:在我的代码中发现了一个错误: 在 put() 和 get() 方法中,它调用 wait_for() 并检查返回值。 如果超时,则返回 false,否则假定满足等待条件并且代码继续放置/获取数据。 如果 wait_for() 由于虚假唤醒而返回,并且当时条件不满足: put() 不满,get() 不空, 那么它将覆盖现有数据或读取错误数据。
解决方法是在 wait_for() 中使用谓词。它将忽略虚假唤醒。
bool put(const T& data)
unique_lock<mutex> writer_lock(writer_mutex);
unique_lock<mutex> shared_lock(shared_mutex);
if (full()) //buffer full
if (writer_queue.wait_for(shared_lock, std::chrono::milliseconds(timeout),
[this]() return !full(); ) == false)
return false;
buffer[write_pos%N] = data;
write_pos++;
reader_queue.notify_one();
return true;
pair<T, bool> get(int i)
unique_lock<mutex> reader_lock(reader_mutex);
cout << "i : " << i << endl;
unique_lock<mutex> shared_lock(shared_mutex);
if (empty()) //buffer empty
if (reader_queue.wait_for(shared_lock, std::chrono::milliseconds(timeout),
[this]() return !empty(); )==false)
T t;
return make_pair(t, false);
pair<T, bool> result = make_pair(buffer[read_pos%N], true);
read_pos++;
writer_queue.notify_one();
return result;
【讨论】:
以上是关于实现有界缓冲区(读取器和写入器之间的非阻塞,读取器之间的阻塞,写入器之间的阻塞)的主要内容,如果未能解决你的问题,请参考以下文章