多线程单读单写fifo队列
Posted
技术标签:
【中文标题】多线程单读单写fifo队列【英文标题】:Multithreaded single-reader single-writer fifo queue 【发布时间】:2010-11-29 17:10:15 【问题描述】:我需要一个用于将消息从一个线程 (A) 传递到另一个线程 (B) 的队列,但是我无法找到一个真正满足我需求的队列,因为它们通常允许添加一个项目失败,这种情况在我的情况下是非常致命的,因为需要处理消息,并且线程真的不能停止并等待空闲空间。
只有线程 A 添加项目,只有线程 B 读取它们 线程 A 绝不能阻塞,但是线程 B 对性能并不重要,因此它可以 添加项目必须始终成功,因此队列不能有大小上限(系统内存不足) 如果队列为空,线程 B 应该等待,直到有项目要处理【问题讨论】:
您使用的是什么线程库?线程? boost::thread 和一些平台特定的代码在这里和那里 您的目标可能会导致内存不足,因为您不允许编写器线程阻止或删除项目。因此,如果您达到队列的临界大小限制,您必须决定是丢弃项目还是阻止写入线程。否则你会因为程序失败而间接丢弃物品:-) 嗯,队列最有可能一次有 【参考方案1】:以下是如何在 C++ 中编写无锁队列:
http://www.ddj.com/hpc-high-performance-computing/210604448
但是当你说“线程 A 不能阻塞”时,你确定这是要求吗? Windows 不是实时操作系统(正常使用的 linux 也不是)。如果您希望线程 A 能够使用所有可用的系统内存,那么它需要分配内存(或等待其他人这样做)。如果读取器和写入器都使用进程内锁(即非共享互斥锁)来操作列表,则操作系统本身无法提供比您更好的时间保证。添加消息的最坏情况是必须去操作系统获取内存。
简而言之,您不喜欢的那些队列具有固定容量是有原因的 - 这样他们就不必在所谓的低延迟线程中分配内存。
因此,无锁代码通常会更少块,但由于内存分配的原因,它不能保证如此,除非你有一个真正巨大的流,否则使用互斥锁的性能不应该那么糟糕要处理的事件(例如,您正在编写网络驱动程序并且消息是传入的以太网数据包)。
所以,在伪代码中,我首先要尝试的是:
Writer:
allocate message and fill it in
acquire lock
append node to intrusive list
signal condition variable
release lock
Reader:
for(;;)
acquire lock
for(;;)
if there's a node
remove it
break
else
wait on condition variable
endif
endfor
release lock
process message
free message
endfor
只有当这被证明会在编写线程中引入不可接受的延迟时,我才会使用无锁代码(除非我碰巧已经有一个合适的队列)。
【讨论】:
在较低级别上,可以使用单链表,其中写入过程附加,读取过程消耗。这可以是无锁的,写入过程将 NULL 指针更改为非 NULL,而读取过程将非 NULL 更改为 NULL。一个小的私有堆将为列表项提供良好的摊销性能。作家 mallocs 和读者释放。如果读者进入睡眠状态,可以提供第三个进程 C,它会推测性地扩大私有堆,从而对进程 A 隐藏分配的阻塞性质。 你的例子会死锁。当读者在等待条件时,它持有锁,这会阻止作者获取锁和信号。您需要在等待条件变量之前释放锁,然后立即重新获取。 @Bobby:你错了。等待条件变量会在等待期间释放关联的锁,然后在从等待返回之前重新获取它。这是“条件变量”含义的一部分——如果您使用的 API 没有为您做到这一点,那么它就不是条件变量,它更像是一个信号量。 API 做到这一点很重要,因为那时你的代码可以依赖于释放锁并开始等待条件的事实是原子发生的——也就是说,在你的线程被锁之前,没有其他线程可以在锁下做任何事情。服务员。【参考方案2】:您可能需要考虑您的要求 - A 真的不能丢弃任何队列项目吗?还是您不希望 B 从队列中拉出两个不是连续项目的连续元素,因为这会以某种方式歪曲事件序列?
例如,如果这是某种数据记录系统,您(可以理解)不希望记录中出现空白 - 但如果没有无限的内存,现实情况是,在某些极端情况下,您可能会超出您的队列容量..
在这种情况下,一种解决方案是让某种特殊元素可以放入队列中,这代表 A 发现它必须丢弃项目的情况。基本上你会保留一个额外的元素,大多数时候它是空的。每次 A 去向队列添加元素时,如果这个额外的元素不为空,则进入。如果 A 发现队列中没有空间,那么它将这个额外的元素配置为说“嘿,队列已满” .
这样,A 永远不会阻塞,您可以在系统非常繁忙时删除元素,但您不会忘记删除元素的事实,因为一旦队列空间可用,此标记就会进入指示发生数据丢失的位置。然后,当进程 B 发现它已将这个溢出标记元素从队列中拉出时,它会执行它需要做的任何事情。
【讨论】:
【参考方案3】:Visual Studio 2010 正在添加 2 个很好地支持此方案的新库,Asynchronous Agents Library 和并行模式库。
代理库支持或异步消息传递,并包含用于向“目标”发送消息和从“源”接收消息的消息块
unbounded_buffer 是一个模板类,它提供了我相信您正在寻找的东西:
#include <agents.h>
#include <ppl.h>
#include <iostream>
using namespace ::Concurrency;
using namespace ::std;
int main()
//to hold our messages, the buffer is unbounded...
unbounded_buffer<int> buf1;
task_group tasks;
//thread 1 sends messages to the unbounded_buffer
//without blocking
tasks.run([&buf1]()
for(int i = 0 ; i < 10000; ++i)
send(&buf1,i)
//signal exit
send(&buf1,-1);
);
//thread 2 receives messages and blocks if there are none
tasks.run([&buf1]()
int result;
while(result = receive(&buf1)!=-1)
cout << "I got a " << result << endl;
);
//wait for the threads to end
tasks.wait();
【讨论】:
真的可以在 Linux 类别下运行吗? FWIW,在您的接收循环中,您将始终输出“I got a 1”,因为 != 在 = 之前被评估【参考方案4】:为什么不使用 STL list> 或 deque> 与围绕添加/删除的互斥锁? thread-safety of STL不够吗?
为什么不创建自己的(单/双)包含指针的链表节点类,并让要添加/删除的项目继承自该类?因此不需要额外的分配。你只需在threadA::add()
和threadB::remove()
中找到几个指针,就完成了。 (虽然您想在互斥锁下执行此操作,但对 threadA 的阻塞影响可以忽略不计,除非您确实做错了什么......)
如果您使用的是 pthread,请查看 sem_post()
和 sem_wait()
。这个想法是 threadB 可以通过 sem_wait()
无限期地阻塞,直到 threadA 将某些东西放入队列中。然后threadA调用sem_post()
。这会唤醒 threadB 来完成它的工作。之后threadB可以重新进入睡眠状态。这是一种处理异步信号的有效方式,在 threadB::remove()
完成之前支持多个 threadA::add()
之类的事情。
【讨论】:
以上是关于多线程单读单写fifo队列的主要内容,如果未能解决你的问题,请参考以下文章
iOS开发:深入理解GCD 第二篇(dispatch_groupdispatch_barrier基于线程安全的多读单写)
v81.01 鸿蒙内核源码分析(读写锁篇) | 内核如何实现多读单写 | 百篇博客分析OpenHarmony源码
v81.01 鸿蒙内核源码分析(读写锁篇) | 内核如何实现多读单写 | 百篇博客分析OpenHarmony源码