MPSC 队列:竞争条件
Posted
技术标签:
【中文标题】MPSC 队列:竞争条件【英文标题】:MPSC Queue: Race Condition 【发布时间】:2019-07-04 11:10:27 【问题描述】:我正在尝试基于this one written in C by Dmitry Vyukov 实现一个无锁多生产者单消费者队列。
到目前为止,我编写的单个测试几乎可以正常工作。但是消费者通常会错过一个项目,要么是第一个,要么是第二个。有时,消费者会错过大约一半的输入。
就像现在一样,它不是无锁的。每次使用 new
运算符时它都会锁定,但我希望在使用分配器之前让它工作并编写一些更详尽的测试。
// src/MpscQueue.hpp
#pragma once
#include <memory>
#include <atomic>
#include <optional>
/**
* Adapted from http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
* @tparam T
*/
template< typename T >
class MpscQueue
public:
MpscQueue()
stub.next.store( nullptr );
head.store( &stub );
tail = &stub;
void push( const T& t )
emplace( t );
void push( T&& t )
emplace( std::move( t ));
template< typename ... Args >
void emplace( Args...args )
auto node = new Node std::make_unique<T>( std::forward<Args>( args )... ), nullptr ;
push( node );
/**
* Returns an item from the queue and returns a unique pointer to it.
*
* If the queue is empty returns a unique pointer set to nullptr
*
* @return A unique ptr to the popped item
*/
std::unique_ptr<T> pop()
Node* tailCopy = tail;
Node* next = tailCopy->next.load();
auto finalize = [ & ]()
tail = next;
std::unique_ptr<Node> p( tailCopy ); // free the node memory after we return
return std::move( tail->value );
;
if ( tailCopy == &stub )
if ( next == nullptr ) return nullptr;
tail = next;
tailCopy = next;
next = next->next;
if ( next ) return std::move( finalize());
if ( tail != head.load()) return nullptr;
push( &stub );
next = tailCopy->next;
return next ? std::move( finalize()) : nullptr;
private:
struct Node
std::unique_ptr<T> value;
std::atomic<Node*> next;
;
void push( Node* node )
Node* prev = head.exchange( node );
prev->next = node;
Node stub;
std::atomic<Node*> head;
Node* tail;
;
// test/main.cpp
#pragma clang diagnostic push
#pragma ide diagnostic ignored "OCUnusedMacroInspection"
#define BOOST_TEST_MODULE test_module
#pragma clang diagnostic pop
#include <boost/test/unit_test.hpp>
// test/utils.hpp
#pragma once
#include <vector>
template< class T >
void removeFromBothIfIdentical( std::vector<T>& a, std::vector<T>& b )
size_t i = 0;
size_t j = 0;
while ( i < a.size() && j < b.size())
if ( a[ i ] == b[ j ] )
a.erase( a.begin() + i );
b.erase( b.begin() + j );
else if ( a[ i ] < b[ j ] ) ++i;
else if ( a[ i ] > b[ j ] ) ++j;
namespace std
template< typename T >
std::ostream& operator<<( std::ostream& ostream, const std::vector<T>& container )
if ( container.empty())
return ostream << "[]";
ostream << "[";
std::string_view separator;
for ( const auto& item: container )
ostream << item << separator;
separator = ", ";
return ostream << "]";
template< class T >
std::vector<T> extractDuplicates( std::vector<T>& container )
auto iter = std::unique( container.begin(), container.end());
std::vector<T> duplicates;
std::move( iter, container.end(), back_inserter( duplicates ));
return duplicates;
#define CHECK_EMPTY( container, message ) \
BOOST_CHECK_MESSAGE( (container).empty(), (message) << ": " << (container) )
// test/MpscQueue.cpp
#pragma ide diagnostic ignored "cert-err58-cpp"
#include <thread>
#include <numeric>
#include <boost/test/unit_test.hpp>
#include "../src/MpscQueue.hpp"
#include "utils.hpp"
using std::thread;
using std::vector;
using std::back_inserter;
BOOST_AUTO_TEST_SUITE( MpscQueueTestSuite )
BOOST_AUTO_TEST_CASE( two_producers )
constexpr int until = 1000;
MpscQueue<int> queue;
thread producerEven( [ & ]()
for ( int i = 0; i < until; i += 2 )
queue.push( i );
);
thread producerOdd( [ & ]()
for ( int i = 1; i < until; i += 2 )
queue.push( i );
);
vector<int> actual;
thread consumer( [ & ]()
using namespace std::chrono_literals;
std::this_thread::sleep_for( 2ms );
while ( auto n = queue.pop())
actual.push_back( *n );
);
producerEven.join();
producerOdd.join();
consumer.join();
vector<int> expected( until );
std::iota( expected.begin(), expected.end(), 0 );
std::sort( actual.begin(), actual.end());
vector<int> duplicates = extractDuplicates( actual );
removeFromBothIfIdentical( expected, actual );
CHECK_EMPTY( duplicates, "Duplicate items" );
CHECK_EMPTY( expected, "Missing items" );
CHECK_EMPTY( actual, "Extra items" );
BOOST_AUTO_TEST_SUITE_END()
【问题讨论】:
您在哪个处理器架构上运行测试? @SegFault,i64。特别是 Intel(R) Core(TM) i5-7500 CPU @ 3.40GHz 你的机器测试通过了吗? 【参考方案1】:下面我的多生产者、单消费者示例是用 Ada 编写的。我将此作为虚拟“伪代码”的来源供您考虑。该示例包含三个文件。
该示例实现了一个简单的数据记录器,其中包含多个生产者、一个共享缓冲区和一个记录生产者生成的字符串的消费者。
第一个文件是共享缓冲区的包规范。 Ada 包规范定义了包中定义的实体的 API。在这种情况下,实体是一个受保护的缓冲区和一个停止记录器的过程。
-----------------------------------------------------------------------
-- Asynchronous Data Logger
-----------------------------------------------------------------------
with Ada.Strings.Unbounded; use Ada.Strings.Unbounded;
package Async_Logger is
type Queue_Index is mod 256;
type Queue_T is array (Queue_Index) of Unbounded_String;
protected Buffer is
entry Put (Log_Entry : in String);
entry Get (Stamped_Entry : out Unbounded_String);
private
Queue : Queue_T;
P_Index : Queue_Index := 0;
G_Index : Queue_Index := 0;
Count : Natural := 0;
end Buffer;
procedure Stop_Logging;
end Async_Logger;
受保护缓冲区中的条目允许任务(即线程)写入缓冲区并从缓冲区读取。这些条目会自动执行所有必要的缓冲区锁定控制。
缓冲区代码和 Stop_Logging 过程的实现在包体中实现。记录日志的消费者也在任务主体中实现,使消费者对生产线程不可见。
with Ada.Calendar; use Ada.Calendar;
with Ada.Calendar.Formatting; use Ada.Calendar.Formatting;
with Ada.Text_IO; use Ada.Text_IO;
package body Async_Logger is
------------
-- Buffer --
------------
protected body Buffer is
---------
-- Put --
---------
entry Put (Log_Entry : in String) when Count < Queue_Index'Modulus is
T_Stamp : Time := Clock;
Value : Unbounded_String :=
To_Unbounded_String
(Image (Date => T_Stamp, Include_Time_Fraction => True) & " : " &
Log_Entry);
begin
Queue (P_Index) := Value;
P_Index := P_Index + 1;
Count := Count + 1;
end Put;
---------
-- Get --
---------
entry Get (Stamped_Entry : out Unbounded_String) when Count > 0 is
begin
Stamped_Entry := Queue (G_Index);
G_Index := G_Index + 1;
Count := Count - 1;
end Get;
end Buffer;
task Logger is
entry Stop;
end Logger;
task body Logger is
Phrase : Unbounded_String;
begin
loop
select
accept Stop;
exit;
else
select
Buffer.Get (Phrase);
Put_Line (To_String (Phrase));
or
delay 0.01;
end select;
end select;
end loop;
end Logger;
procedure Stop_Logging is
begin
Logger.Stop;
end Stop_Logging;
end Async_Logger;
Put 条目有一个保护条件,允许该条目仅在缓冲区未满时执行。 Get 条目有一个保护条件,允许该条目仅在缓冲区为空时执行。
名为 Logger 的任务是消费者任务。它一直运行到它的 Stop 条目被调用为止。
Stop_Logging 过程调用 Logger 的 Stop 条目。
第三个文件是用于测试 Async_Logger 包的“主”过程。该文件创建了两个生产者,P1 和 P2。这些生产者每人向 Buffer 写入 10 条消息,然后退出。
with Async_Logger; use Async_Logger;
procedure Async_Test is
task P1;
task P2;
task body P1 is
begin
for I in 1..10 loop
Buffer.Put(I'Image);
delay 0.01;
end loop;
end P1;
task body P2 is
Num : Float := 0.0;
begin
for I in 1..10 loop
Buffer.Put(Num'Image);
Num := Num + 1.0;
delay 0.01;
end loop;
end P2;
begin
delay 0.2;
Stop_Logging;
end Async_Test;
Async_Test 过程只需等待 0.2 秒,然后调用 Stop_Logging。
运行这个程序的输出是:
2019-02-11 18:35:01.83 : 1
2019-02-11 18:35:01.83 : 0.00000E+00
2019-02-11 18:35:01.85 : 1.00000E+00
2019-02-11 18:35:01.85 : 2
2019-02-11 18:35:01.87 : 3
2019-02-11 18:35:01.87 : 2.00000E+00
2019-02-11 18:35:01.88 : 3.00000E+00
2019-02-11 18:35:01.88 : 4
2019-02-11 18:35:01.90 : 5
2019-02-11 18:35:01.90 : 4.00000E+00
2019-02-11 18:35:01.92 : 6
2019-02-11 18:35:01.92 : 5.00000E+00
2019-02-11 18:35:01.93 : 6.00000E+00
2019-02-11 18:35:01.93 : 7
2019-02-11 18:35:01.95 : 7.00000E+00
2019-02-11 18:35:01.95 : 8
2019-02-11 18:35:01.96 : 8.00000E+00
2019-02-11 18:35:01.96 : 9
2019-02-11 18:35:01.98 : 10
2019-02-11 18:35:01.98 : 9.00000E+00
【讨论】:
我以前从未使用过 ada,我不太了解它如何同步所有内容的细节。队列是线程安全的结构吗?是阻塞吗?另外,我的目标是永远不要阻塞,除非它正在获取内存以创建新节点。 Ada 保护类型和受保护对象是线程安全的。每个条目都会自动处理锁定和解锁。每个条目上的保护条件也会导致调用任务暂停,直到保护条件评估为 True。在多生产者单消费者模式中需要阻塞,因为只有当没有其他生产者写入共享缓冲区时,每个生产者才必须写入共享缓冲区,并且当生产者写入缓冲区时,消费者不能从共享缓冲区中读取。此类行为会造成竞争条件和损坏的缓冲区状态。 受保护条目中的代码可以与 C 或 C++ 中的临界区进行比较。所有这些操作都必须以防止竞争条件的方式执行。在我上面的示例中,缓冲区包含一个队列以及 P_Index、C_Index 和 Count。这些都是状态变量,必须与缓冲区上的每个操作保持一致。如果两个生产者修改同一个队列元素,结果是数据损坏或丢失。同样,两个生产者试图同时更新 P_Index 和 Count 会产生不可预知的结果。【参考方案2】:您的推送功能缺少该行:
node->next = nullptr;
在顶部。
查看我的实现以及 cmets 中的大量分析, 这里:https://github.com/CarloWood/ai-utils/blob/master/threading/MpscQueue.h
【讨论】:
链接已损坏。 谢谢!我最近将线程相关的实用程序移到了另一个 git 子模块;我现在在帖子中修复了链接。以上是关于MPSC 队列:竞争条件的主要内容,如果未能解决你的问题,请参考以下文章