Boost lockfree deque 生产者与消费者多对多线程应用
Posted hbright
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Boost lockfree deque 生产者与消费者多对多线程应用相关的知识,希望对你有一定的参考价值。
boost库中有一个boost::lockfree::queue类型的 队列,对于一般的需要队列的程序,其效率都算不错的了,下面使用一个用例来说明。
程序是一个典型的生产者与消费者的关系,都可以使用多线程,其效率要比使用上层的互斥锁要快很多,因为它直接使用底层的原子操作来进行同步数据的。
freedeque.h
1 #pragma once#ifndef INCLUDED_UTILS_LFRINGQUEUE 2 #define INCLUDED_UTILS_LFRINGQUEUE 3 4 #define _ENABLE_ATOMIC_ALIGNMENT_FIX 5 #define ATOMIC_FLAG_INIT 0 6 7 8 #pragma once 9 10 11 #include <vector> 12 #include <mutex> 13 #include <thread> 14 #include <atomic> 15 #include <chrono> 16 #include <cstring> 17 #include <iostream> 18 19 // Lock free ring queue 20 21 template < typename _TyData, long _uiCount = 100000 > 22 class lfringqueue 23 { 24 public: 25 lfringqueue(long uiCount = _uiCount) : m_lTailIterator(0), m_lHeadIterator(0), m_uiCount(uiCount) 26 { 27 m_queue = new _TyData*[m_uiCount]; 28 memset(m_queue, 0, sizeof(_TyData*) * m_uiCount); 29 } 30 31 ~lfringqueue() 32 { 33 if (m_queue) 34 delete[] m_queue; 35 } 36 37 bool enqueue(_TyData *pdata, unsigned int uiRetries = 1000) 38 { 39 if (NULL == pdata) 40 { 41 // Null enqueues are not allowed 42 return false; 43 } 44 45 unsigned int uiCurrRetries = 0; 46 while (uiCurrRetries < uiRetries) 47 { 48 // Release fence in order to prevent memory reordering 49 // of any read or write with following write 50 std::atomic_thread_fence(std::memory_order_release); 51 52 long lHeadIterator = m_lHeadIterator; 53 54 if (NULL == m_queue[lHeadIterator]) 55 { 56 long lHeadIteratorOrig = lHeadIterator; 57 58 ++lHeadIterator; 59 if (lHeadIterator >= m_uiCount) 60 lHeadIterator = 0; 61 62 // Don‘t worry if this CAS fails. It only means some thread else has 63 // already inserted an item and set it. 64 if (std::atomic_compare_exchange_strong(&m_lHeadIterator, &lHeadIteratorOrig, lHeadIterator)) 65 { 66 // void* are always atomic (you wont set a partial pointer). 67 m_queue[lHeadIteratorOrig] = pdata; 68 69 if (m_lEventSet.test_and_set()) 70 { 71 m_bHasItem.test_and_set(); 72 } 73 return true; 74 } 75 } 76 else 77 { 78 // The queue is full. Spin a few times to check to see if an item is popped off. 79 ++uiCurrRetries; 80 } 81 } 82 return false; 83 } 84 85 bool dequeue(_TyData **ppdata) 86 { 87 if (!ppdata) 88 { 89 // Null dequeues are not allowed! 90 return false; 91 } 92 93 bool bDone = false; 94 bool bCheckQueue = true; 95 96 while (!bDone) 97 { 98 // Acquire fence in order to prevent memory reordering 99 // of any read or write with following read 100 std::atomic_thread_fence(std::memory_order_acquire); 101 //MemoryBarrier(); 102 long lTailIterator = m_lTailIterator; 103 _TyData *pdata = m_queue[lTailIterator]; 104 //volatile _TyData *pdata = m_queue[lTailIterator]; 105 if (NULL != pdata) 106 { 107 bCheckQueue = true; 108 long lTailIteratorOrig = lTailIterator; 109 110 ++lTailIterator; 111 if (lTailIterator >= m_uiCount) 112 lTailIterator = 0; 113 114 //if ( lTailIteratorOrig == atomic_cas( (volatile long*)&m_lTailIterator, lTailIterator, lTailIteratorOrig )) 115 if (std::atomic_compare_exchange_strong(&m_lTailIterator, &lTailIteratorOrig, lTailIterator)) 116 { 117 // Sets of sizeof(void*) are always atomic (you wont set a partial pointer). 118 m_queue[lTailIteratorOrig] = NULL; 119 120 // Gets of sizeof(void*) are always atomic (you wont get a partial pointer). 121 *ppdata = (_TyData*)pdata; 122 123 return true; 124 } 125 } 126 else 127 { 128 bDone = true; 129 m_lEventSet.clear(); 130 } 131 } 132 *ppdata = NULL; 133 return false; 134 } 135 136 137 long countguess() const 138 { 139 long lCount = trycount(); 140 141 if (0 != lCount) 142 return lCount; 143 144 // If the queue is full then the item right before the tail item will be valid. If it 145 // is empty then the item should be set to NULL. 146 long lLastInsert = m_lTailIterator - 1; 147 if (lLastInsert < 0) 148 lLastInsert = m_uiCount - 1; 149 150 _TyData *pdata = m_queue[lLastInsert]; 151 if (pdata != NULL) 152 return m_uiCount; 153 154 return 0; 155 } 156 157 long getmaxsize() const 158 { 159 return m_uiCount; 160 } 161 162 bool HasItem() 163 { 164 return m_bHasItem.test_and_set(); 165 } 166 167 void SetItemFlagBack() 168 { 169 m_bHasItem.clear(); 170 } 171 172 private: 173 long trycount() const 174 { 175 long lHeadIterator = m_lHeadIterator; 176 long lTailIterator = m_lTailIterator; 177 178 if (lTailIterator > lHeadIterator) 179 return m_uiCount - lTailIterator + lHeadIterator; 180 181 // This has a bug where it returns 0 if the queue is full. 182 return lHeadIterator - lTailIterator; 183 } 184 185 private: 186 std::atomic<long> m_lHeadIterator; // enqueue index 187 std::atomic<long> m_lTailIterator; // dequeue index 188 _TyData **m_queue; // array of pointers to the data 189 long m_uiCount; // size of the array 190 std::atomic_flag m_lEventSet = ATOMIC_FLAG_INIT; // a flag to use whether we should change the item flag 191 std::atomic_flag m_bHasItem = ATOMIC_FLAG_INIT; // a flag to indicate whether there is an item enqueued 192 }; 193 194 #endif //INCLUDED_UTILS_LFRINGQUEUE
/* * File: main.cpp * Author: Peng * * Created on February 22, 2014, 9:55 PM */ #include <iostream> #include <string> #include "freedeque.h" #include <sstream> #include <boost/thread/thread.hpp> #include <boost/lockfree/queue.hpp> #include <boost/atomic.hpp> #include<boost/thread/lock_guard.hpp> #include<boost/thread/mutex.hpp> #include<boost/date_time/posix_time/posix_time.hpp> const int NUM_ENQUEUE_THREAD = 5; const int NUM_DEQUEUE_THREAD = 10; const long NUM_ITEM = 50000; const long NUM_DATA = NUM_ENQUEUE_THREAD * NUM_ITEM; class Data { public: Data(int i = 0) : m_iData(i) { std::stringstream ss; ss << i; m_szDataString = ss.str(); } bool operator< (const Data & aData) const { if (m_iData < aData.m_iData) return true; else return false; } int& GetData() { return m_iData; } private: int m_iData; std::string m_szDataString; }; Data* g_arrData = new Data[NUM_DATA]; boost::mutex mtx; constexpr long size = 0.5 * NUM_DATA; lfringqueue < Data, 10000> LockFreeQueue; boost::lockfree::queue<Data*> BoostQueue(10000); bool GenerateRandomNumber_FindPointerToTheNumber_EnQueue(int n) { for (long i = 0; i < NUM_ITEM; i++) { int x = i + NUM_ITEM * n; Data* pData = g_arrData + x; LockFreeQueue.enqueue(pData); } return true; } void print(Data* pData) { if (!pData) return; boost::lock_guard<boost::mutex> lock(mtx); std::cout << pData->GetData() << std::endl; } bool Dequeue() { Data *pData = NULL; while (true) { if (LockFreeQueue.dequeue(&pData) && pData) { print(pData); } else { boost::thread::sleep(boost::get_system_time() + boost::posix_time::milliseconds(5)); } } return true; } int main(int argc, char** argv) { for (int i = 0; i < NUM_DATA; ++i) { Data data(i); //DataArray[i] = data; *(g_arrData + i) = data; } std::thread PublishThread[NUM_ENQUEUE_THREAD]; std::thread ConsumerThread[NUM_DEQUEUE_THREAD]; std::chrono::duration<double> elapsed_seconds; for (int i = 0; i < NUM_ENQUEUE_THREAD; i++) { PublishThread[i] = std::thread(GenerateRandomNumber_FindPointerToTheNumber_EnQueue, i); } for (int i = 0; i < NUM_DEQUEUE_THREAD; i++) { ConsumerThread[i] = std::thread{ Dequeue }; } for (int i = 0; i < NUM_DEQUEUE_THREAD; i++) { ConsumerThread[i].join(); } for (int i = 0; i < NUM_ENQUEUE_THREAD; i++) { PublishThread[i].join(); } delete[] g_arrData; return 0; }
说明:模板文件是原作者写的,为了验证其正确性,后面的测试程序我改写了一下,最后测试程序是无法退出来的,这里只是测试,没有进一步完善了。
在测试中发现deque应该是大小限制的,再增大data的数据程序会阻塞在某个地方没有进一步再查找原因了,以后有时候再做修改,对于一般的工程都够用了。
以上是关于Boost lockfree deque 生产者与消费者多对多线程应用的主要内容,如果未能解决你的问题,请参考以下文章
使用 sizeof(boost::lockfree::queue<std::string>) 时出错
boost::lockfree::spsc_queue 忙等待策略。有阻塞弹出吗?
evpp性能测试: 对无锁队列boost::lockfree::queue和moodycamel::ConcurrentQueue做一个性能对比测试
evpp性能测试: 对无锁队列boost::lockfree::queue和moodycamel::ConcurrentQueue做一个性能对比测试