在多线程中使用链表队列
Posted
技术标签:
【中文标题】在多线程中使用链表队列【英文标题】:consuming linkedlist queue in multithreads 【发布时间】:2017-03-11 20:04:41 【问题描述】:我正在学习 C++ 中的 OpenMP 并行处理库。我觉得我掌握了基本概念,并尝试通过实现链表队列来测试我的知识。我想从多个线程中使用队列。
这里的挑战是不要两次消耗同一个节点。所以我正在考虑在线程之间共享队列,但一次只允许一个线程更新(转到队列中的下一个节点)它。为此,我可以使用关键或锁定。但是,不使用它们;不知何故,它似乎工作得很好。没有发生竞争条件。
#include <iostream>
#include <omp.h>
#include <zconf.h>
struct Node
int data;
struct Node* next = NULL;
Node()
Node(int data)
this->data = data;
Node(int data, Node* node)
this->data = data;
this->next = node;
;
void processNode(Node *pNode);
struct Queue
Node *head = NULL, *tail = NULL;
Queue& add(int data)
add(new Node(data));
return *this;
void add(Node *node)
if (head == NULL)
head = node;
tail = node;
else
tail->next = node;
tail = node;
Node* remove()
Node *node;
node = head;
if (head != NULL)
head = head->next;
return node;
;
int main()
srand(12);
Queue queue;
for (int i = 0; i < 6; ++i)
queue.add(i);
double timer_started = omp_get_wtime();
omp_set_num_threads(3);
#pragma omp parallel
Node *n;
while ((n = queue.remove()) != NULL)
double started = omp_get_wtime();
processNode(n);
double elapsed = omp_get_wtime() - started;
printf("Thread id: %d data: %d, took: %f \n", omp_get_thread_num(), n->data, elapsed);
double elapsed = omp_get_wtime() - timer_started;
std::cout << "end. took " << elapsed << " in total " << std::endl;
return 0;
void processNode(Node *node)
int r = rand() % 3 + 1; // between 1 and 3
sleep(r);
输出如下所示:
Thread id: 0 data: 0, took: 1.000136
Thread id: 2 data: 2, took: 1.000127
Thread id: 2 data: 4, took: 1.000208
Thread id: 1 data: 1, took: 3.001371
Thread id: 0 data: 3, took: 2.001041
Thread id: 2 data: 5, took: 2.004960
end. took 4.00583 in total
我已经用不同数量的线程多次运行它。但是,我无法得到任何比赛条件或错误。我在想两个不同的线程可以调用“删除”并处理单个节点两次。但它没有发生。为什么?
https://github.com/muatik/openmp-examples/blob/master/linkedlist/main.cpp
【问题讨论】:
您期待什么结果?在处理队列期间,您似乎没有修改/写回变量。如果您不修改任何内容,那么您应该每次都得到相同的结果(线程激活的顺序除外——您无法控制或预测)。如果您执行某种取决于访问节点的顺序的操作,那么您可能会遇到不同的结果。或者,如果您不使用任何锁,但每个线程都写入同一个变量,您可能会遇到竞争条件。抱歉,我现在想不出任何例子。 我认为线程使用线程之间共享的队列。他们调用队列的remove
方法,队列将其头指针更改为下一个节点。我在问,是否有可能两个线程同时调用 remove() 并接收相同的节点?
是的,这是可能的。没有什么可以阻止两个线程在完成删除之前完成node = head;
。到目前为止,您是否幸运或不幸取决于您如何看待它。尝试更长的队列。
您可以只拆分数据并为每个线程提供单独的列表。那么并发调用remove
就不会成为问题了。
最简单的方法是在remove
函数顶部的locked with a std::scoped_lock
队列中使用std::mutex
成员。也许不是最有效的,但很容易。
【参考方案1】:
首先,您可以通过测试永远无法证明多线程代码是正确的。你的预感,你需要一个锁/关键部分是正确的。
您的测试在队列中特别容易。以下内容会快速打破您的队列:
for (int i = 0; i < 10000; ++i)
queue.add(i);
double timer_started = omp_get_wtime();
#pragma omp parallel
size_t counter = 0;
Node *n;
while ((n = queue.remove()) != NULL)
processNode(n);
counter++;
#pragma omp critical
std::cout << "Thread " << omp_get_thread_num() << " processed " << counter << " nodes." << std::endl;
void processNode(Node *node)
例如显示以下有趣的结果:
Thread 1 processed 11133 nodes.
Thread 0 processed 9039 nodes.
但同样,如果您使用此测试代码创建了一个正确运行一百万次的队列,并不意味着该队列已正确实现。
特别是,仅仅保护remove
是不够的,您必须正确保护对队列数据的每一次读写。要了解实现这一点的难度,请观看此excellent talk by Herb Sutter。
一般来说,我建议使用现有的并行数据结构,例如来自Boost.Lockfree。
然而,不幸的是 OpenMP 和 C++11 lock / atomic primitives don't officially play well together。所以严格来说,如果你使用 OpenMP,你应该坚持使用 OpenMP 同步原语或使用它们的库。
【讨论】:
感谢您提供翔实的回答。是的,它在你的代码中被破坏了;这就是我想要的。所以;如果我修改remove()
之类的 Node* remove() Node *node; #pragma omp critical node = head; if (head != NULL) head = head->next; return node;
方法,那么消费者线程是否不再处理同一个节点?
这可行,但前提是您不同时添加任何内容。在这种情况下,与std::vector
相比,这种队列没有什么意义。以上是关于在多线程中使用链表队列的主要内容,如果未能解决你的问题,请参考以下文章
利刃 MVVMLight 8:DispatchHelper在多线程和调度中的使用
可以在多线程环境中使用单个 QueueConnection 吗?