C/C++线程安全型队列的实现

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了C/C++线程安全型队列的实现相关的知识,希望对你有一定的参考价值。

编写一个线程安全的队列,所谓线程安全,就是该队列能够实现多个线程同时正确的增删改队列结点,也就是能够实现对队列这个临界资源的保护。需要实现的函数包括:
(1) InitQueue函数:初始化一个空的队列,并初始化各个用于保护队列的信号量。
(2) EnQueue函数:在队列尾部加入一个结点
(3) DeQueue函数:删除队列头部结点
(4) Clear函数:删除队列中的所有结点
(5) Find函数:查找队列中是否有指定的元素,若有,返回能够访问该结点的指针;若无,返回NULL。
(6) Print函数:打印当前队列中的所有元素。
完成该队列后,自己编写一个测试程序,生成多个线程同时读写该队列,验证你的队列执行是否正确。
我基本不会,所以请具体给出代码(可以把代码发我邮箱yiboyuntianzhe@163.com),简单的介绍我不会采纳,我会先测试,无错的话可以追加悬赏分。
其中线性安全请用到互斥量Mutex,这是我先在在学的,所以很关键。
谢谢各位大神。
注:我不是要用RMB买设计,我只是到网上寻求帮助,卖设计的人请绕道,谢谢。

首先,互斥量这种线程相关的内容是平台相关的,我假设你用的是windows平台开发。
其次,说明一下我的开发环境,vs2008,控制台程序,空的工程。
最后给你贴代码,分文件来看。

===头文件QueueNode.h===
===你需要的节点数据可能不是整数,只要将typedef int QUEUEDATA这一句的int换成你想要的类型即可,但要注意,这个类型必须实现赋值操作符重载,相等比较操作符重载,以及复制构造函数===

#ifndef _QUEUE_NODE_H_
#define _QUEUE_NODE_H_

typedef int QUEUEDATA;

typedef struct node

QUEUEDATA data;
node* m_pNext;
QUEUENODE;

#endif

===队列头文件Queue.h,有平台相关内容,请注意===
#ifndef _QUEUE_H_
#define _QUEUE_H_

#include "QueueNode.h"
#include <Windows.h>

class ThreadSafeQueue

public:
ThreadSafeQueue();
virtual ~ThreadSafeQueue();

bool InitQueue();
void EnQueue(const QUEUEDATA& data);
void DeQueue();
void Clear();
const QUEUENODE* Find(const QUEUEDATA& data) const;
void Print();

protected:
HANDLE m_hMutex;
QUEUENODE* m_pQueueHead;
;

#endif

===队列函数实现文件Queue.cpp===
#include "Queue.h"
#include <iostream>

ThreadSafeQueue::ThreadSafeQueue()

m_pQueueHead = new QUEUENODE;
m_pQueueHead->m_pNext = 0;


ThreadSafeQueue::~ThreadSafeQueue()

Clear();
delete m_pQueueHead;
CloseHandle(m_hMutex);


bool ThreadSafeQueue::InitQueue()

m_hMutex = CreateMutex(0, FALSE, 0);
return (m_hMutex!=0);


void ThreadSafeQueue::EnQueue(const QUEUEDATA& data)

WaitForSingleObject(m_hMutex, INFINITE);
QUEUENODE* pNode = new QUEUENODE;
pNode->data = data;
pNode->m_pNext = 0;
QUEUENODE* pTemp = m_pQueueHead;
while (pTemp->m_pNext != 0)

pTemp = pTemp->m_pNext;

pTemp->m_pNext = pNode;
ReleaseMutex(m_hMutex);


void ThreadSafeQueue::DeQueue()

WaitForSingleObject(m_hMutex, INFINITE);
QUEUENODE* pNode = m_pQueueHead->m_pNext;
if (pNode != 0)

m_pQueueHead->m_pNext = pNode->m_pNext;
delete pNode;
pNode = 0;

ReleaseMutex(m_hMutex);


const QUEUENODE* ThreadSafeQueue::Find(const QUEUEDATA& data) const

WaitForSingleObject(m_hMutex, INFINITE);
QUEUENODE* pNode = m_pQueueHead->m_pNext;
while (pNode != 0)

if (pNode->data == data)

break;

pNode = pNode->m_pNext;

return pNode;
ReleaseMutex(m_hMutex);


void ThreadSafeQueue::Clear()

WaitForSingleObject(m_hMutex, INFINITE);
QUEUENODE* pNode = m_pQueueHead->m_pNext;
QUEUENODE* pTemp = 0;
while (pNode != 0)

pTemp = pNode->m_pNext;
delete pNode;
pNode = pTemp;

m_pQueueHead->m_pNext = 0;
ReleaseMutex(m_hMutex);


void ThreadSafeQueue::Print()

WaitForSingleObject(m_hMutex, INFINITE);
QUEUENODE* pNode = m_pQueueHead->m_pNext;
while (pNode != 0)

std::cout << pNode->data << "\t";
pNode = pNode->m_pNext;

std::cout << std::endl;
ReleaseMutex(m_hMutex);


===测试代码文件main.cpp,包含了测试用可执行程序,两个操作queue的线程,需要说明的是,我本来打算用WaitMultipleObjects函数来等待两个线程都结束,但是没搞清楚是什么问题没有卡住,不打算继续纠缠它了,所以让主线程Sleep了5秒钟===
#include "Queue.h"
#include <iostream>

DWORD WINAPI HandleQueue(void* pParam);
DWORD WINAPI HandleQueue2(void* pParam);

int main()

ThreadSafeQueue queue;
queue.InitQueue();
HANDLE hThread[2] = 0;
DWORD threadID = 0;
hThread[0] = CreateThread(NULL, 0, HandleQueue, (void*)(&queue), NULL, &threadID);
hThread[0] = CreateThread(NULL, 0, HandleQueue2, (void*)(&queue), NULL, &threadID);

//WaitForMultipleObjects(2, hThread, TRUE, INFINITE);
Sleep(5000);
queue.Print();
queue.Clear();
return 0;


DWORD WINAPI HandleQueue(void* pParam)

ThreadSafeQueue* pQueue = reinterpret_cast<ThreadSafeQueue*>(pParam);
for (int i = 0; i < 100; i++)

std::cout << "HandleQueue EnQueue" << std::endl;
pQueue->EnQueue(i);

for (int i = 0; i < 50; i++)

std::cout << "HandleQueue DeQueue" << std::endl;
pQueue->DeQueue();

return 0;


DWORD WINAPI HandleQueue2(void* pParam)

ThreadSafeQueue* pQueue = reinterpret_cast<ThreadSafeQueue*>(pParam);
for (int i = 0; i < 100; i++)

std::cout << "HandleQueue2 EnQueue" << std::endl;
pQueue->EnQueue(i+100);

for (int i = 0; i < 50; i++)

std::cout << "HandleQueue2 DeQueue" << std::endl;
pQueue->DeQueue();

return 0;


新建一个空的控制台程序工程,向工程中加入这几个文件,编译之后可以直接运行。
第一个线程投入队列100个元素,出队50个元素,第二个线程同样。最后主线程输出队列中最后的内容,然后清空。
队列用链表实现,可以试想一下,如果线程同步没有处理,指针操作时一定会引起崩溃追问

'QueueNode.h': No such file or directory
大哥,头文件也拜托写一下吧。

追答

我的第一个文件就是QueueNode.h,请看清楚
文件如下:
1. QueueNode.h
2. Queue.h
3. Queue.cpp
4. main.cpp
都是以“===”夹在中间分割的,请仔细看

参考技术A 发你邮箱里了追问

大哥,你给的2个,其中第一个都很多error,另外一个你给的模版好像可以,不过我的库函数似乎少一个文件,他报了一个错误是fatal error C1083: Cannot open include file: 'pthread.h': No such file or directory。先谢谢你。你能告诉我你的QQ吗?有些问题请帮我改一下,我想把我们老师给我们的代码给你看一下,你用的Synchronized类我们应该没学,这个我不好给老师解释。
求你帮我到底啊,我的所有财富值都可以给你。急急急求

追答

如果单纯是队列我肯定帮你,但这题要求生成多线程,这我就无能为力了,Synchronized类表示线程中的同步,如果编多线程这些基本的应该得会,我只能帮你到这一步,实在抱歉了

c++11 并发队列的生产方案 BlockingConcurrentQueue

ConcurrentQueue

这是我在生产中使用到的一个c++11 实现的无锁队列,有以下特性:

1、线程安全的无锁队列
2、支持c++11 的move语义,优化对象拷贝性能
3、模板化
4、可预分配内存、也可动态分配
5、支持批量处理
6、包含阻塞队列
7、异常安全

demo

将github里的文件 Blockingconcurrentqueue.h 跟 Concurrentqueue.h 加载到项目中

#include "Blockingconcurrentqueue.h"
//声明队列
moodycamel::BlockingConcurrentQueue<T> g_myQueue;

//工作函数监听队列消息
void concumer() {
	while (!_bStop) {
		T t;
		g_myQueue.wait_dequeue(T);
	}
}

void producer() {
	T t;
    g_myQueue.enqueue(std::move(t));
}

原理

CAS保证线程安全

首先,线程安全通常采用互斥量,或者条件变量,本质就是锁机制,或者前文介绍过的Dirsuptor采用的CAS机制,这两种机制其实也是我们常说的悲观锁、乐观锁模式。
互斥量:需要获取到锁才能进入。
CAS:通常可能一次比对就可以进入逻辑块,或者多试几次进行更新值。

stl提供的cas操作:

bool atomic_compare_exchange_weak (atomic* obj, T* expected, T val);

首先函数会将 obj 与 expected 的内容作比较:

如果相等,那么将交换 obj 和 val 的值,并返回 true。
如果不相等,则什么也不做,之后返回 false。

ConcurrentQueue 实现的线程安全以及高性能所使用到的也便是CAS机制。

c++11 move语义

// Enqueues a single item (by moving it, if possible).
	// Allocates memory if required. Only fails if memory allocation fails (or implicit
	// production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0,
	// or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
	// Thread-safe.
	inline bool enqueue(T&& item)
	{
		if ((details::likely)(inner.enqueue(std::move(item)))) {
			sema->signal();
			return true;
		}
		return false;
	}

工作线程放入一个元素时,采用move语义减少对象创建拷贝

模板化内存管理

template<AllocationMode canAlloc, typename U>
	inline bool inner_enqueue(U&& element)
	{
		auto producer = get_or_add_implicit_producer();
		return producer == nullptr ? false : producer->ConcurrentQueue::ImplicitProducer::template enqueue<canAlloc>(std::forward<U>(element));
	}

当不同的生产者生产元素时,每个生产者在总队列里时采用子队列形式隔离的,每个生产者的元素采用块的形式存储而非链表方式,提供存储性能,但这样也造成了一个问题:当多个生产者间的元素有顺序要求时,整体队列无法做到。比如 a生产a消息,b生产b消息,宏观上可能ab有先后顺序,但是消费时,因为a跟b时存储在不同的块,出队的顺序独立,所以有可能存在ba的出队顺序。在生产过程中需要区分好队列的生产跟消费逻辑

看过STL的源码,大家应该队AllocationMode会有大概的认知吧。

以上是关于C/C++线程安全型队列的实现的主要内容,如果未能解决你的问题,请参考以下文章

C中具有共享队列的线程安全生产者/消费者

C++并发编程----无锁实现线程安全队列(《C++ Concurrency in Action》 读书笔记)

将 Swift 调用同步到基于 C 的线程不安全库

c++ string线程安全吗

基于event 实现的线程安全的优先队列(python实现)

MaxMind 的 GeoIP C 实现线程安全吗?