Boost lockfree deque 生产者与消费者多对多线程应用

Posted hbright

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Boost lockfree deque 生产者与消费者多对多线程应用相关的知识,希望对你有一定的参考价值。

  boost库中有一个boost::lockfree::queue类型的 队列,对于一般的需要队列的程序,其效率都算不错的了,下面使用一个用例来说明。

  程序是一个典型的生产者与消费者的关系,都可以使用多线程,其效率要比使用上层的互斥锁要快很多,因为它直接使用底层的原子操作来进行同步数据的。

  freedeque.h

  1 #pragma once#ifndef INCLUDED_UTILS_LFRINGQUEUE  
  2 #define INCLUDED_UTILS_LFRINGQUEUE  
  3 
  4 #define _ENABLE_ATOMIC_ALIGNMENT_FIX  
  5 #define ATOMIC_FLAG_INIT 0  
  6 
  7 
  8 #pragma once  
  9 
 10 
 11 #include <vector>  
 12 #include <mutex>  
 13 #include <thread>  
 14 #include <atomic>  
 15 #include <chrono>  
 16 #include <cstring>  
 17 #include <iostream>  
 18 
 19 // Lock free ring queue   
 20 
 21 template < typename _TyData, long _uiCount = 100000 >
 22 class lfringqueue
 23 {
 24 public:
 25     lfringqueue(long uiCount = _uiCount) : m_lTailIterator(0), m_lHeadIterator(0), m_uiCount(uiCount)
 26     {
 27         m_queue = new _TyData*[m_uiCount];
 28         memset(m_queue, 0, sizeof(_TyData*) * m_uiCount);
 29     }
 30 
 31     ~lfringqueue()
 32     {
 33         if (m_queue)
 34             delete[] m_queue;
 35     }
 36 
 37     bool enqueue(_TyData *pdata, unsigned int uiRetries = 1000)
 38     {
 39         if (NULL == pdata)
 40         {
 41             // Null enqueues are not allowed  
 42             return false;
 43         }
 44 
 45         unsigned int uiCurrRetries = 0;
 46         while (uiCurrRetries < uiRetries)
 47         {
 48             // Release fence in order to prevent memory reordering   
 49             // of any read or write with following write  
 50             std::atomic_thread_fence(std::memory_order_release);
 51 
 52             long lHeadIterator = m_lHeadIterator;
 53 
 54             if (NULL == m_queue[lHeadIterator])
 55             {
 56                 long lHeadIteratorOrig = lHeadIterator;
 57 
 58                 ++lHeadIterator;
 59                 if (lHeadIterator >= m_uiCount)
 60                     lHeadIterator = 0;
 61 
 62                 // Don‘t worry if this CAS fails.  It only means some thread else has  
 63                 // already inserted an item and set it.  
 64                 if (std::atomic_compare_exchange_strong(&m_lHeadIterator, &lHeadIteratorOrig, lHeadIterator))
 65                 {
 66                     // void* are always atomic (you wont set a partial pointer).  
 67                     m_queue[lHeadIteratorOrig] = pdata;
 68 
 69                     if (m_lEventSet.test_and_set())
 70                     {
 71                         m_bHasItem.test_and_set();
 72                     }
 73                     return true;
 74                 }
 75             }
 76             else
 77             {
 78                 // The queue is full.  Spin a few times to check to see if an item is popped off.  
 79                 ++uiCurrRetries;
 80             }
 81         }
 82         return false;
 83     }
 84 
 85     bool dequeue(_TyData **ppdata)
 86     {
 87         if (!ppdata)
 88         {
 89             // Null dequeues are not allowed!  
 90             return false;
 91         }
 92 
 93         bool bDone = false;
 94         bool bCheckQueue = true;
 95 
 96         while (!bDone)
 97         {
 98             // Acquire fence in order to prevent memory reordering   
 99             // of any read or write with following read  
100             std::atomic_thread_fence(std::memory_order_acquire);
101             //MemoryBarrier();  
102             long lTailIterator = m_lTailIterator;
103             _TyData *pdata = m_queue[lTailIterator];
104             //volatile _TyData *pdata = m_queue[lTailIterator];              
105             if (NULL != pdata)
106             {
107                 bCheckQueue = true;
108                 long lTailIteratorOrig = lTailIterator;
109 
110                 ++lTailIterator;
111                 if (lTailIterator >= m_uiCount)
112                     lTailIterator = 0;
113 
114                 //if ( lTailIteratorOrig == atomic_cas( (volatile long*)&m_lTailIterator, lTailIterator, lTailIteratorOrig ))  
115                 if (std::atomic_compare_exchange_strong(&m_lTailIterator, &lTailIteratorOrig, lTailIterator))
116                 {
117                     // Sets of sizeof(void*) are always atomic (you wont set a partial pointer).  
118                     m_queue[lTailIteratorOrig] = NULL;
119 
120                     // Gets of sizeof(void*) are always atomic (you wont get a partial pointer).  
121                     *ppdata = (_TyData*)pdata;
122 
123                     return true;
124                 }
125             }
126             else
127             {
128                 bDone = true;
129                 m_lEventSet.clear();
130             }
131         }
132         *ppdata = NULL;
133         return false;
134     }
135 
136 
137     long countguess() const
138     {
139         long lCount = trycount();
140 
141         if (0 != lCount)
142             return lCount;
143 
144         // If the queue is full then the item right before the tail item will be valid.  If it  
145         // is empty then the item should be set to NULL.  
146         long lLastInsert = m_lTailIterator - 1;
147         if (lLastInsert < 0)
148             lLastInsert = m_uiCount - 1;
149 
150         _TyData *pdata = m_queue[lLastInsert];
151         if (pdata != NULL)
152             return m_uiCount;
153 
154         return 0;
155     }
156 
157     long getmaxsize() const
158     {
159         return m_uiCount;
160     }
161 
162     bool HasItem()
163     {
164         return m_bHasItem.test_and_set();
165     }
166 
167     void SetItemFlagBack()
168     {
169         m_bHasItem.clear();
170     }
171 
172 private:
173     long trycount() const
174     {
175         long lHeadIterator = m_lHeadIterator;
176         long lTailIterator = m_lTailIterator;
177 
178         if (lTailIterator > lHeadIterator)
179             return m_uiCount - lTailIterator + lHeadIterator;
180 
181         // This has a bug where it returns 0 if the queue is full.  
182         return lHeadIterator - lTailIterator;
183     }
184 
185 private:
186     std::atomic<long> m_lHeadIterator;  // enqueue index  
187     std::atomic<long> m_lTailIterator;  // dequeue index  
188     _TyData **m_queue;                  // array of pointers to the data  
189     long m_uiCount;                     // size of the array  
190     std::atomic_flag m_lEventSet = ATOMIC_FLAG_INIT;       // a flag to use whether we should change the item flag  
191     std::atomic_flag m_bHasItem = ATOMIC_FLAG_INIT;        // a flag to indicate whether there is an item enqueued  
192 };
193 
194 #endif //INCLUDED_UTILS_LFRINGQUEUE  

 

  

/*
* File:   main.cpp
* Author: Peng
*
* Created on February 22, 2014, 9:55 PM
*/
#include <iostream> 
#include <string>  
#include "freedeque.h" 
#include <sstream>  
#include <boost/thread/thread.hpp>  
#include <boost/lockfree/queue.hpp>    
#include <boost/atomic.hpp>  
#include<boost/thread/lock_guard.hpp>
#include<boost/thread/mutex.hpp>
#include<boost/date_time/posix_time/posix_time.hpp>

const int NUM_ENQUEUE_THREAD = 5;
const int NUM_DEQUEUE_THREAD = 10;
const long NUM_ITEM = 50000;
const long NUM_DATA = NUM_ENQUEUE_THREAD * NUM_ITEM;

class Data {
public:
	Data(int i = 0) : m_iData(i)
	{
		std::stringstream ss;
		ss << i;
		m_szDataString = ss.str();    
	}

	bool operator< (const Data & aData) const
	{
		if (m_iData < aData.m_iData)
			return true;
		else
			return false;
	}

	int& GetData()
	{
		return m_iData;
	}
private:
	int m_iData;
	std::string m_szDataString;
};

Data* g_arrData = new Data[NUM_DATA];
boost::mutex mtx;

constexpr long size = 0.5 * NUM_DATA;
lfringqueue < Data, 10000> LockFreeQueue;
boost::lockfree::queue<Data*> BoostQueue(10000);

bool GenerateRandomNumber_FindPointerToTheNumber_EnQueue(int n)
{
	for (long i = 0; i < NUM_ITEM; i++)
	{
		int x = i + NUM_ITEM * n;
		Data* pData = g_arrData + x;
		LockFreeQueue.enqueue(pData);	
	}
	return true;
}



void print(Data* pData) {
	if (!pData)
		return;

	boost::lock_guard<boost::mutex> lock(mtx);

	std::cout << pData->GetData() << std::endl;
	
}

bool Dequeue()
{
	Data *pData = NULL;
	
	while (true)
	{
		if (LockFreeQueue.dequeue(&pData) && pData)
		{
			print(pData);
		}
		else {
			boost::thread::sleep(boost::get_system_time() + boost::posix_time::milliseconds(5));
		}
	}

	return true;
}

int main(int argc, char** argv)
{
	for (int i = 0; i < NUM_DATA; ++i)
	{
		Data data(i);
		//DataArray[i] = data;
		*(g_arrData + i) = data;
	}

	std::thread PublishThread[NUM_ENQUEUE_THREAD];
	std::thread ConsumerThread[NUM_DEQUEUE_THREAD];
	std::chrono::duration<double> elapsed_seconds;

	for (int i = 0; i < NUM_ENQUEUE_THREAD; i++)
	{
		PublishThread[i] = std::thread(GenerateRandomNumber_FindPointerToTheNumber_EnQueue, i);
	}

	for (int i = 0; i < NUM_DEQUEUE_THREAD; i++)
	{
		ConsumerThread[i] = std::thread{ Dequeue };
	}

	for (int i = 0; i < NUM_DEQUEUE_THREAD; i++)
	{
		ConsumerThread[i].join();
	}

	for (int i = 0; i < NUM_ENQUEUE_THREAD; i++)
	{
		PublishThread[i].join();
	}

	delete[] g_arrData;
	return 0;
}

  说明:模板文件是原作者写的,为了验证其正确性,后面的测试程序我改写了一下,最后测试程序是无法退出来的,这里只是测试,没有进一步完善了。

  在测试中发现deque应该是大小限制的,再增大data的数据程序会阻塞在某个地方没有进一步再查找原因了,以后有时候再做修改,对于一般的工程都够用了。

以上是关于Boost lockfree deque 生产者与消费者多对多线程应用的主要内容,如果未能解决你的问题,请参考以下文章

boost::lockfree::queue多线程读写实例

boost::lockfree::函数队列?

使用 sizeof(boost::lockfree::queue<std::string>) 时出错

boost::lockfree::spsc_queue 忙等待策略。有阻塞弹出吗?

evpp性能测试: 对无锁队列boost::lockfree::queue和moodycamel::ConcurrentQueue做一个性能对比测试

evpp性能测试: 对无锁队列boost::lockfree::queue和moodycamel::ConcurrentQueue做一个性能对比测试