C ++ 11中无锁的多生产者/消费者队列
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了C ++ 11中无锁的多生产者/消费者队列相关的知识,希望对你有一定的参考价值。
我正在尝试在C ++ 11中实现无锁的多生产者,多消费者队列。我将其作为学习活动来进行,因此我很清楚可以使用现有的开放源代码实现,但是我真的很想知道为什么我的代码不起作用。数据存储在环形缓冲区中,显然是“有界MPMC队列”。
我已经将其建模为与我阅读的Disruptor十分接近的模型。我注意到的是,它可以与单个消费者和单个/多个生产者一起正常工作,似乎只有多个消费者可以打破它。
这里是队列:
template <typename T>
class Queue : public IQueue<T>
{
public:
explicit Queue( int capacity );
~Queue();
bool try_push( T value );
bool try_pop( T& value );
private:
typedef struct
{
bool readable;
T value;
} Item;
std::atomic<int> m_head;
std::atomic<int> m_tail;
int m_capacity;
Item* m_items;
};
template <typename T>
Queue<T>::Queue( int capacity ) :
m_head( 0 ),
m_tail( 0 ),
m_capacity(capacity),
m_items( new Item[capacity] )
{
for( int i = 0; i < capacity; ++i )
{
m_items[i].readable = false;
}
}
template <typename T>
Queue<T>::~Queue()
{
delete[] m_items;
}
template <typename T>
bool Queue<T>::try_push( T value )
{
while( true )
{
// See that there's room
int tail = m_tail.load(std::memory_order_acquire);
int new_tail = ( tail + 1 );
int head = m_head.load(std::memory_order_acquire);
if( ( new_tail - head ) >= m_capacity )
{
return false;
}
if( m_tail.compare_exchange_weak( tail, new_tail, std::memory_order_acq_rel ) )
{
// In try_pop, m_head is incremented before the reading of the value has completed,
// so though we've acquired this slot, a consumer thread may be in the middle of reading
tail %= m_capacity;
std::atomic_thread_fence( std::memory_order_acquire );
while( m_items[tail].readable )
{
}
m_items[tail].value = value;
std::atomic_thread_fence( std::memory_order_release );
m_items[tail].readable = true;
return true;
}
}
}
template <typename T>
bool Queue<T>::try_pop( T& value )
{
while( true )
{
int head = m_head.load(std::memory_order_acquire);
int tail = m_tail.load(std::memory_order_acquire);
if( head == tail )
{
return false;
}
int new_head = ( head + 1 );
if( m_head.compare_exchange_weak( head, new_head, std::memory_order_acq_rel ) )
{
head %= m_capacity;
std::atomic_thread_fence( std::memory_order_acquire );
while( !m_items[head].readable )
{
}
value = m_items[head].value;
std::atomic_thread_fence( std::memory_order_release );
m_items[head].readable = false;
return true;
}
}
}
这是我正在使用的测试:
void Test( std::string name, Queue<int>& queue )
{
const int NUM_PRODUCERS = 64;
const int NUM_CONSUMERS = 2;
const int NUM_ITERATIONS = 512;
bool table[NUM_PRODUCERS*NUM_ITERATIONS];
memset(table, 0, NUM_PRODUCERS*NUM_ITERATIONS*sizeof(bool));
std::vector<std::thread> threads(NUM_PRODUCERS+NUM_CONSUMERS);
std::chrono::system_clock::time_point start, end;
start = std::chrono::system_clock::now();
std::atomic<int> pop_count (NUM_PRODUCERS * NUM_ITERATIONS);
std::atomic<int> push_count (0);
for( int thread_id = 0; thread_id < NUM_PRODUCERS; ++thread_id )
{
threads[thread_id] = std::thread([&queue,thread_id,&push_count]()
{
int base = thread_id * NUM_ITERATIONS;
for( int i = 0; i < NUM_ITERATIONS; ++i )
{
while( !queue.try_push( base + i ) ){};
push_count.fetch_add(1);
}
});
}
for( int thread_id = 0; thread_id < ( NUM_CONSUMERS ); ++thread_id )
{
threads[thread_id+NUM_PRODUCERS] = std::thread([&]()
{
int v;
while( pop_count.load() > 0 )
{
if( queue.try_pop( v ) )
{
if( table[v] )
{
std::cout << v << " already set" << std::endl;
}
table[v] = true;
pop_count.fetch_sub(1);
}
}
});
}
for( int i = 0; i < ( NUM_PRODUCERS + NUM_CONSUMERS ); ++i )
{
threads[i].join();
}
end = std::chrono::system_clock::now();
std::chrono::duration<double> duration = end - start;
std::cout << name << " " << duration.count() << std::endl;
std::atomic_thread_fence( std::memory_order_acq_rel );
bool result = true;
for( int i = 0; i < NUM_PRODUCERS * NUM_ITERATIONS; ++i )
{
if( !table[i] )
{
std::cout << "failed at " << i << std::endl;
result = false;
}
}
std::cout << name << " " << ( result? "success" : "fail" ) << std::endl;
}
任何朝着正确方向前进的方法都将不胜感激。我对内存隔离栅还很陌生,而不仅仅是对所有东西都使用互斥锁,所以我可能只是从根本上误解了一些东西。
欢呼J
我来看看Moody Camel的实现。
这是完全用C ++ 11编写的,用于C ++的快速通用无锁队列。文档似乎还不错,并且进行了一些性能测试。
[在所有其他有趣的事物中(无论如何,它们都值得一读),所有这些都包含在单个标头中,并且可以在简化的BSD许可下使用。只需将其放入您的项目中即可享受!
最简单的方法使用循环缓冲区。那就像是一个由256个元素组成的数组,并且您使用uint8_t
作为索引,因此它会环绕并在溢出时从头开始。
您可以建立的最简单的原语是当您具有单个生产者,单个使用者线程时。
缓冲区有两个头:
- 写入头:指向接下来要写入的元素。
- 读取头:指向接下来将要读取的元素。
生产者的操作:
- 如果写头+ 1 ==读头,则缓冲区已满,返回缓冲区已满错误。
- 将内容写入元素。
- 插入内存屏障以同步CPU内核。
- 将写头向前移动。
在缓冲区已满的情况下,仍然还有1个房间,但是我们保留该空间,以区别于缓冲区空的情况。
消费者的操作:
- 如果读头==写头,则缓冲区为空,返回缓冲区为空错误。
- 读取元素的内容。
- 插入内存屏障以同步CPU内核。
- 将读取头向前移动。
生产者拥有写头,消费者拥有读头,这些没有并发性。另外,在操作完成时更新磁头,以确保使用者将完成的元素留在后面,而消耗则将完全消耗的空单元留在后面。
每当派出新线程并可以与线程进行双向通信时,都在两个方向上创建两个这样的管道。
鉴于我们在谈论锁释放,这也意味着没有线程被阻塞,当无事可做时,线程正在空转,您可能希望检测到这一点并在发生这种情况时增加一些睡眠。
[lock free queue如何?
这是无内存排序的无锁队列,但这需要在初始化队列时预先设置当前线程的数量。
例如:-
int* ret;
int max_concurrent_thread = 16;
lfqueue_t my_queue;
lfqueue_init(&my_queue, max_concurrent_thread );
/** Wrap This scope in other threads **/
int_data = (int*) malloc(sizeof(int));
assert(int_data != NULL);
*int_data = i++;
/*Enqueue*/
while (lfqueue_enq(&my_queue, int_data) == -1) {
printf("ENQ Full ?
");
}
/** Wrap This scope in other threads **/
/*Dequeue*/
while ( (int_data = lfqueue_deq(&my_queue)) == NULL) {
printf("DEQ EMPTY ..
");
}
// printf("%d
", *(int*) ret );
free(ret);
/** End **/
lfqueue_destroy(&my_queue);
[在另一个类似的问题上,我提出了a solution这个问题。我相信它是迄今为止发现的最小的。
我在这里不会给出相同的答案,但是the repository具有您想要的无锁队列的功能齐全的C ++实现。
编辑:感谢@PeterCordes的代码审查,当在模板上使用64位整数时,我发现了该解决方案中的一个错误,但现在它可以正常工作。
这是运行测试时收到的输出
Creating 4 producers & 4 consumers
to flow 10.000.000 items trough the queue.
Produced: 10.743.668.245.000.000
Consumed: 5.554.289.678.184.004
Produced: 10.743.668.245.000.000
Consumed: 15.217.833.969.059.643
Produced: 10.743.668.245.000.000
Consumed: 7.380.542.769.600.801
Produced: 10.743.668.245.000.000
Consumed: 14.822.006.563.155.552
Checksum: 0 (it must be zero)
以上是关于C ++ 11中无锁的多生产者/消费者队列的主要内容,如果未能解决你的问题,请参考以下文章