阻塞队列竞争条件?

Posted

技术标签:

【中文标题】阻塞队列竞争条件?【英文标题】:Blocking queue race condition? 【发布时间】:2012-01-05 06:25:32 【问题描述】:

我正在尝试在 pthread、semaphore.h 和 gcc atomic builtins 之上实现一个由循环缓冲区支持的高性能阻塞队列。队列需要同时处理来自不同线程的多个读取器和写入器。

我已经隔离了某种竞争条件,我不确定这是否是对某些原子操作和信号量行为的错误假设,或者我的设计是否存在根本缺陷。

我已将其提取并简化为以下独立示例。我希望这个程序永远不会返回。然而,它确实会在几十万次迭代后返回,并在队列中检测到损坏。

在下面的示例中(用于说明),它实际上并没有存储任何内容,它只是将一个保存实际数据的单元格设置为 1,将一个空单元格设置为 0。有一个计数信号量(vacancys)表示空闲单元格的数量,另一个计数信号量(occupants)表示占用单元格的数量。

作家做以下事情:

    减少职位空缺 以原子方式获取下一个头部索引(mod 队列大小) 给它写信 增加居住人数

读者反其道而行之:

    减少占用人数 自动获取下一个尾索引(mod 队列大小) 从中读取 增加职位空缺

我希望鉴于上述情况,恰好一个线程可以同时读取或写入任何给定的单元格。

任何关于它为什么不起作用或调试策略的想法都值得赞赏。下面的代码和输出...

#include <stdlib.h>
#include <semaphore.h>
#include <iostream>

using namespace std;

#define QUEUE_CAPACITY 8 // must be power of 2
#define NUM_THREADS 2

struct CountingSemaphore

    sem_t m;
    CountingSemaphore(unsigned int initial)  sem_init(&m, 0, initial); 
    void post()  sem_post(&m); 
    void wait()  sem_wait(&m); 
    ~CountingSemaphore()  sem_destroy(&m); 
;

struct BlockingQueue

    unsigned int head; // (head % capacity) is next head position
    unsigned int tail; // (tail % capacity) is next tail position
    CountingSemaphore vacancies; // how many cells are vacant
    CountingSemaphore occupants; // how many cells are occupied

    int cell[QUEUE_CAPACITY];
// (cell[x] == 1) means occupied
// (cell[x] == 0) means vacant

    BlockingQueue() :
        head(0),
        tail(0),
        vacancies(QUEUE_CAPACITY),
        occupants(0)
    
        for (size_t i = 0; i < QUEUE_CAPACITY; i++)
            cell[i] = 0;
    

    // put an item in the queue
    void put()
    
        vacancies.wait();

        // atomic post increment
        set(__sync_fetch_and_add(&head, 1) % QUEUE_CAPACITY);

        occupants.post();
    

    // take an item from the queue
    void take()
    
        occupants.wait();

        // atomic post increment
        get(__sync_fetch_and_add(&tail, 1) % QUEUE_CAPACITY);

        vacancies.post();
    

    // set cell i
    void set(unsigned int i)
    
        // atomic compare and assign
        if (!__sync_bool_compare_and_swap(&cell[i], 0, 1))
        
            corrupt("set", i);
            exit(-1);
        
    

    // get cell i
    void get(unsigned int i)
    
        // atomic compare and assign
        if (!__sync_bool_compare_and_swap(&cell[i], 1, 0))
        
            corrupt("get", i);
            exit(-1);
        
    

    // corruption detected
    void corrupt(const char* action, unsigned int i)
    
        static CountingSemaphore sem(1);
        sem.wait();

        cerr << "corruption detected" << endl;
        cerr << "action = " << action << endl;
        cerr << "i = " << i << endl;
        cerr << "head = " << head << endl;
        cerr << "tail = " << tail << endl;

        for (unsigned int j = 0; j < QUEUE_CAPACITY; j++)
            cerr << "cell[" << j << "] = " << cell[j] << endl;
    
;

BlockingQueue q;

// keep posting to the queue forever
void* Source(void*)

    while (true)
        q.put();

    return 0;


// keep taking from the queue forever
void* Sink(void*)

    while (true)
        q.take();

    return 0;
 

int main()

    pthread_t id;

    // start some pthreads to run Source function
    for (int i = 0; i < NUM_THREADS; i++)
        if (pthread_create(&id, NULL, &Source, 0))
            abort();

    // start some pthreads to run Sink function
    for (int i = 0; i < NUM_THREADS; i++)
        if (pthread_create(&id, NULL, &Sink, 0))
            abort();

    while (true);

将上面的编译如下:

    $ g++ -pthread AboveCode.cpp
    $ ./a.out

每次输出都不一样,这里举一个例子:

    corruption detected
    action = get
    i = 6
    head = 122685
    tail = 122685
    cell[0] = 0
    cell[1] = 0
    cell[2] = 1
    cell[3] = 0
    cell[4] = 1
    cell[5] = 0
    cell[6] = 1
    cell[7] = 1

我的系统是 Intel Core 2 上的 Ubuntu 11.10:

    $ uname -a
    Linux 3.0.0-14-generic #23-Ubuntu SMP \
      Mon Nov 21 20:28:43 UTC 2011 x86_64 x86_64 x86_64 GNU/Linux
    $ cat /proc/cpuinfo | grep Intel
    model name : Intel(R) Core(TM)2 Quad  CPU   Q9300  @ 2.50GHz
    $ g++ --version
    g++ (Ubuntu/Linaro 4.6.1-9ubuntu3) 4.6.1

谢谢, 安德鲁。

【问题讨论】:

不幸的是,很难理解这里出了什么问题......您不使用已经存在和调试过的版本有什么原因吗?例如,在性能方面,您的版本在多核系统上可能存在错误共享问题。 虚假共享问题是指原子操作中的隐式内存屏障?强制将 Lx 缓存写回主内存等。我不确定如何避免这种情况?两个或多个读取器/写入器可能在不同 CPU 上的不同线程上运行,那么您还建议如何同步它们? 并非如此。错误共享是关于缓存争用问题,当两个不同的内核访问不同的变量时......恰好落在同一个缓存行中。由于独占所有权(写入所需)是在缓存行的基础上协商的,因此两个内核必须序列化它们的操作,即使它们正在访问和修改两个语义不同的变量。它可能真的会损害性能......并且现有的优化实现已经解决了这个问题:) 现有的哪些实现解决了这个问题,如何解决?您能否提供一个文件/行参考作为示例。 【参考方案1】:

一种可能的情况,针对两个写入线程(W0、W1)和一个读取线程(R0)逐步跟踪。 W0 比 W1 更早进入 put(),被操作系统或硬件中断,后来完成。

        w0 (core 0)               w1 (core 1)                r0
t0         ----                      ---       blocked on occupants.wait() / take
t1      entered put()                ---                    ---         
t2      vacancies.wait()           entered put()            ---
t3      got new_head = 1           vacancies.wait()         ---
t4     <interrupted by OS>         got new_head = 2         ---
t5                                 written 1 at cell[2]     ---
t6                                 occupants.post();        ---
t7                                 exited put()            waked up
t8                                   ---               got new_tail = 1
t9     <still in interrupt>          ---    read 0 from ceil[1]  !! corruption !!
t10     written 1 at cell[1]                           
t11     occupants.post();
t12     exited put()

【讨论】:

是的,就是这样,非常感谢。仅仅因为作者在进入时按原子递增的序列号排序,并不意味着它们将以相同的顺序发布给读者。需要第二个序列号(即 head_start、head_end、tail_start、tail_end),并且只有在读/写操作完成后才增加 *_end 并发布信号量直到“低水位线”。 很高兴听到这个消息。祝您实验顺利!【参考方案2】:

从设计的角度来看,我会将整个队列视为共享资源,并使用单个互斥锁来保护它。

作家做以下事情:

    获取互斥锁 写入队列(包括处理索引) 释放互斥锁

读者做以下事情:

    获取互斥锁 从队列中读取(包括处理索引) 释放互斥锁

【讨论】:

这将提供不必要的读取器和写入器序列化。假设队列为空并且 QUEUE_CAPACITY 写入者来自 QUEUE_CAPACITY 不同的线程。理想情况下,您希望 QUEUE_CAPACITY 构造函数在 QUEUE_CAPACITY 缓冲区的不同项目上并行运行。根据您的设计,在等待互斥体时,一次只能运行一个构造函数。 虽然这种方法会引入序列化,但也更容易实现正确性。【参考方案3】:

我有一个理论。这是一个循环队列,因此一个读取线程可能会被占用。假设读者获取索引 0。在它做任何事情之前它会失去 CPU。另一个读取器线程获取索引 1,然后是 2,然后是 3……然后是 7,然后是 0。第一个读取器醒来,两个线程都认为他们对索引 0 具有独占访问权。不知道如何证明这一点。希望对您有所帮助。

【讨论】:

读取器(和写入器)需要等待计数信号量才能进入函数体。说队列已满。 occupants = 8 and vacancies = 0。单元 0 上的读卡器将访问减少到 7,单元 1 上的读卡器将访问减少到 6,...,单元 7 上的读卡器将访问减少到 0。第二个单元格 0 上的读取器阻塞,因为没有剩余计数信号量。他会一直阻塞,直到读者退出,将空缺增加到 1,然后写者进来递减空缺,写入然后增加占用。 AARGHH... 我已经想通了,如果不是读者 0 首先退出并发布空缺,而是读者 4(比如说)。然后作者会进来并覆盖单元格 0。这就是问题所在,现在解决方案是什么。 :) 但是队列不一定要满一圈。它一次只需要一个条目。 住户 + 空置率 当我说 lap 时,我的意思是容量为 n 的循环队列在每插入 n 次后返回索引 0,即 head % n == 0 for head = 0, head = n, head = 2 * n等等。我不是在谈论违反容量。不过,我们描述的是同样的问题。

以上是关于阻塞队列竞争条件?的主要内容,如果未能解决你的问题,请参考以下文章

用C++11实现一个有界的阻塞队列

阻塞队列

第238天学习打卡(知识点回顾 阻塞队列)

java并发阻塞队列的使用

linux内核阻塞IO

C中的非繁忙阻塞队列实现