在多线程中使用链表队列

Posted

技术标签:

【中文标题】在多线程中使用链表队列【英文标题】:consuming linkedlist queue in multithreads 【发布时间】:2017-03-11 20:04:41 【问题描述】:

我正在学习 C++ 中的 OpenMP 并行处理库。我觉得我掌握了基本概念,并尝试通过实现链表队列来测试我的知识。我想从多个线程中使用队列。

这里的挑战是不要两次消耗同一个节点。所以我正在考虑在线程之间共享队列,但一次只允许一个线程更新(转到队列中的下一个节点)它。为此,我可以使用关键或锁定。但是,不使用它们;不知何故,它似​​乎工作得很好。没有发生竞争条件。

#include <iostream>
#include <omp.h>
#include <zconf.h>


struct Node 
    int data;
    struct Node* next = NULL;
    Node() 
    Node(int data) 
        this->data = data;
    
    Node(int data, Node* node) 
        this->data = data;
        this->next = node;
    
;

void processNode(Node *pNode);

struct Queue 
    Node *head = NULL, *tail = NULL;

    Queue& add(int data) 
        add(new Node(data));
        return *this;
    

    void add(Node *node) 
        if (head == NULL) 
            head = node;
            tail = node;
         else 
            tail->next = node;
            tail = node;
        
    

    Node* remove() 
        Node *node;
            node = head;
        if (head != NULL)
            head = head->next;

        return node;
    

;

int main() 
    srand(12);
    Queue queue;
    for (int i = 0; i < 6; ++i) 
        queue.add(i);
    

    double timer_started = omp_get_wtime();
    omp_set_num_threads(3);
    #pragma omp parallel
    
        Node *n;
        while ((n = queue.remove()) != NULL) 
            double started = omp_get_wtime();
            processNode(n);
            double elapsed = omp_get_wtime() - started;
            printf("Thread id: %d data: %d, took: %f \n", omp_get_thread_num(), n->data, elapsed);
        
    
    double elapsed = omp_get_wtime() - timer_started;

    std::cout << "end. took " << elapsed << " in total " << std::endl;
    return 0;


void processNode(Node *node) 
    int r = rand() % 3 + 1; // between 1 and 3
    sleep(r);

输出如下所示:

Thread id: 0 data: 0, took: 1.000136 
Thread id: 2 data: 2, took: 1.000127 
Thread id: 2 data: 4, took: 1.000208 
Thread id: 1 data: 1, took: 3.001371 
Thread id: 0 data: 3, took: 2.001041 
Thread id: 2 data: 5, took: 2.004960 
end. took 4.00583 in total 

我已经用不同数量的线程多次运行它。但是,我无法得到任何比赛条件或错误。我在想两个不同的线程可以调用“删除”并处理单个节点两次。但它没有发生。为什么?

https://github.com/muatik/openmp-examples/blob/master/linkedlist/main.cpp

【问题讨论】:

您期待什么结果?在处理队列期间,您似乎没有修改/写回变量。如果您不修改任何内容,那么您应该每次都得到相同的结果(线程激活的顺序除外——您无法控制或预测)。如果您执行某种取决于访问节点的顺序的操作,那么您可能会遇到不同的结果。或者,如果您不使用任何锁,但每个线程都写入同一个变量,您可能会遇到竞争条件。抱歉,我现在想不出任何例子。 我认为线程使用线程之间共享的队列。他们调用队列的remove 方法,队列将其头指针更改为下一个节点。我在问,是否有可能两个线程同时调用 remove() 并接收相同的节点? 是的,这是可能的。没有什么可以阻止两个线程在完成删除之前完成node = head;。到目前为止,您是否幸运或不幸取决于您如何看待它。尝试更长的队列。 您可以只拆分数据并为每个线程提供单独的列表。那么并发调用remove 就不会成为问题了。 最简单的方法是在remove 函数顶部的locked with a std::scoped_lock 队列中使用std::mutex 成员。也许不是最有效的,但很容易。 【参考方案1】:

首先,您可以通过测试永远无法证明多线程代码是正确的。你的预感,你需要一个锁/关键部分是正确的。

您的测试在队列中特别容易。以下内容会快速打破您的队列:

for (int i = 0; i < 10000; ++i) 
    queue.add(i);


double timer_started = omp_get_wtime();
#pragma omp parallel

    size_t counter = 0;
    Node *n;
    while ((n = queue.remove()) != NULL) 
        processNode(n);
        counter++;
    

    #pragma omp critical
    std::cout << "Thread " << omp_get_thread_num() << " processed " << counter << " nodes." << std::endl;


void processNode(Node *node) 

例如显示以下有趣的结果:

Thread 1 processed 11133 nodes.
Thread 0 processed 9039 nodes.

但同样,如果您使用此测试代码创建了一个正确运行一百万次的队列,并不意味着该队列已正确实现。

特别是,仅仅保护remove 是不够的,您必须正确保护对队列数据的每一次读写。要了解实现这一点的难度,请观看此excellent talk by Herb Sutter。

一般来说,我建议使用现有的并行数据结构,例如来自Boost.Lockfree。

然而,不幸的是 OpenMP 和 C++11 lock / atomic primitives don't officially play well together。所以严格来说,如果你使用 OpenMP,你应该坚持使用 OpenMP 同步原语或使用它们的库。

【讨论】:

感谢您提供翔实的回答。是的,它在你的代码中被破坏了;这就是我想要的。所以;如果我修改 remove() 之类的 Node* remove() Node *node; #pragma omp critical node = head; if (head != NULL) head = head-&gt;next; return node; 方法,那么消费者线程是否不再处理同一个节点? 这可行,但前提是您不同时添加任何内容。在这种情况下,与std::vector 相比,这种队列没有什么意义。

以上是关于在多线程中使用链表队列的主要内容,如果未能解决你的问题,请参考以下文章

如何在多线程环境下实现FIFO队列?

利刃 MVVMLight 8:DispatchHelper在多线程和调度中的使用

可以在多线程环境中使用单个 QueueConnection 吗?

多线程中的应用之队列(queue)

在多线程 Rails 环境中使用 Redis 的最佳方式是啥? (彪马/Sidekiq)

C++并发编程:线程安全链表