C++0x 没有信号量?如何同步线程?
Posted
技术标签:
【中文标题】C++0x 没有信号量?如何同步线程?【英文标题】:C++0x has no semaphores? How to synchronize threads? 【发布时间】:2011-06-15 02:44:58 【问题描述】:C++0x 真的会没有信号量吗? Stack Overflow 上已经有一些关于信号量使用的问题。我一直使用它们(posix 信号量)让一个线程等待另一个线程中的某个事件:
void thread0(...)
doSomething0();
event1.wait();
...
void thread1(...)
doSomething1();
event1.post();
...
如果我要使用互斥锁来做到这一点:
void thread0(...)
doSomething0();
event1.lock(); event1.unlock();
...
void thread1(...)
event1.lock();
doSomethingth1();
event1.unlock();
...
问题:很难看,并且不能保证thread1首先锁定互斥锁(鉴于同一个线程应该锁定和解锁互斥锁,您也无法在thread0和thread1启动之前锁定event1)。
那么既然 boost 也没有信号量,那么实现上述目标的最简单方法是什么?
【问题讨论】:
也许使用条件 mutex 和 std::promise 和 std::future? 【参考方案1】:您可以轻松地从互斥锁和条件变量构建一个:
#include <mutex>
#include <condition_variable>
class semaphore
std::mutex mutex_;
std::condition_variable condition_;
unsigned long count_ = 0; // Initialized as locked.
public:
void release()
std::lock_guard<decltype(mutex_)> lock(mutex_);
++count_;
condition_.notify_one();
void acquire()
std::unique_lock<decltype(mutex_)> lock(mutex_);
while(!count_) // Handle spurious wake-ups.
condition_.wait(lock);
--count_;
bool try_acquire()
std::lock_guard<decltype(mutex_)> lock(mutex_);
if(count_)
--count_;
return true;
return false;
;
【讨论】:
有人应该向标准委员会提交提案 这里的一条评论最初让我感到困惑的是等待中的锁,有人可能会问,如果等待持有锁,线程如何通过通知?有点模糊记录的答案是 condition_variable.wait 脉冲锁,允许另一个线程以原子方式通过通知,至少我是这样理解的 它被故意排除在 Boost 之外,因为信号量对于程序员来说太难缠了。据说条件变量更易于管理。我明白他们的意思,但觉得有点自大。我假设相同的逻辑适用于 C++11——程序员应该以“自然地”使用 condvars 或其他认可的同步技术的方式编写程序。无论是在 condvar 之上还是在本机上实现,提供一个信号量都会与此相反。 注意 - 请参阅 en.wikipedia.org/wiki/Spurious_wakeup 了解while(!count_)
循环背后的基本原理。
@Maxim 对不起,我不认为你是对的。 sem_wait 和 sem_post 也仅在争用时进行系统调用(检查sourceware.org/git/?p=glibc.git;a=blob;f=nptl/sem_wait.c),因此此处的代码最终会复制 libc 实现,并可能存在错误。如果您打算在任何系统上实现可移植性,这可能是一个解决方案,但如果您只需要 Posix 兼容性,请使用 Posix 信号量。【参考方案2】:
基于Maxim Yegorushkin's answer,我尝试用C++11风格制作示例。
#include <mutex>
#include <condition_variable>
class Semaphore
public:
Semaphore (int count_ = 0)
: count(count_)
inline void notify()
std::unique_lock<std::mutex> lock(mtx);
count++;
cv.notify_one();
inline void wait()
std::unique_lock<std::mutex> lock(mtx);
while(count == 0)
cv.wait(lock);
count--;
private:
std::mutex mtx;
std::condition_variable cv;
int count;
;
【讨论】:
你可以让 wait() 也变成三行:cv.wait(lck, [this]() return count > 0; );
本着 lock_guard 的精神添加另一个类也很有帮助。在 RAII 方式中,将信号量作为引用的构造函数调用信号量的 wait() 调用,而析构函数调用其 notify() 调用。这可以防止异常无法释放信号量。
是不是有死锁,如果说N个线程调用了wait()并且count==0,那么cv.notify_one();从未调用过,因为 mtx 尚未发布?
@Marcello 等待线程不持有锁。条件变量的全部意义在于提供原子的“解锁并等待”操作。
您应该在调用 notify_one() 之前释放锁定以避免立即阻塞唤醒...请参阅此处:en.cppreference.com/w/cpp/thread/condition_variable/notify_all【参考方案3】:
我决定尽可能多地以标准的风格编写最健壮/通用的 C++11 信号量(注意using semaphore = ...
,您通常只使用类似于通常的名称semaphore
使用string
而不是basic_string
):
template <typename Mutex, typename CondVar>
class basic_semaphore
public:
using native_handle_type = typename CondVar::native_handle_type;
explicit basic_semaphore(size_t count = 0);
basic_semaphore(const basic_semaphore&) = delete;
basic_semaphore(basic_semaphore&&) = delete;
basic_semaphore& operator=(const basic_semaphore&) = delete;
basic_semaphore& operator=(basic_semaphore&&) = delete;
void notify();
void wait();
bool try_wait();
template<class Rep, class Period>
bool wait_for(const std::chrono::duration<Rep, Period>& d);
template<class Clock, class Duration>
bool wait_until(const std::chrono::time_point<Clock, Duration>& t);
native_handle_type native_handle();
private:
Mutex mMutex;
CondVar mCv;
size_t mCount;
;
using semaphore = basic_semaphore<std::mutex, std::condition_variable>;
template <typename Mutex, typename CondVar>
basic_semaphore<Mutex, CondVar>::basic_semaphore(size_t count)
: mCountcount
template <typename Mutex, typename CondVar>
void basic_semaphore<Mutex, CondVar>::notify()
std::lock_guard<Mutex> lockmMutex;
++mCount;
mCv.notify_one();
template <typename Mutex, typename CondVar>
void basic_semaphore<Mutex, CondVar>::wait()
std::unique_lock<Mutex> lockmMutex;
mCv.wait(lock, [&] return mCount > 0; );
--mCount;
template <typename Mutex, typename CondVar>
bool basic_semaphore<Mutex, CondVar>::try_wait()
std::lock_guard<Mutex> lockmMutex;
if (mCount > 0)
--mCount;
return true;
return false;
template <typename Mutex, typename CondVar>
template<class Rep, class Period>
bool basic_semaphore<Mutex, CondVar>::wait_for(const std::chrono::duration<Rep, Period>& d)
std::unique_lock<Mutex> lockmMutex;
auto finished = mCv.wait_for(lock, d, [&] return mCount > 0; );
if (finished)
--mCount;
return finished;
template <typename Mutex, typename CondVar>
template<class Clock, class Duration>
bool basic_semaphore<Mutex, CondVar>::wait_until(const std::chrono::time_point<Clock, Duration>& t)
std::unique_lock<Mutex> lockmMutex;
auto finished = mCv.wait_until(lock, t, [&] return mCount > 0; );
if (finished)
--mCount;
return finished;
template <typename Mutex, typename CondVar>
typename basic_semaphore<Mutex, CondVar>::native_handle_type basic_semaphore<Mutex, CondVar>::native_handle()
return mCv.native_handle();
【讨论】:
这工作,稍作修改。使用谓词调用wait_for
和wait_until
方法返回一个布尔值(不是`std::cv_status)。
抱歉这么晚才挑剔。 std::size_t
是无符号的,因此将其递减到零以下是 UB,它始终是 >= 0
。恕我直言count
应该是int
。
@RichardHodges 没有办法减少到零以下,所以没有问题,信号量的负数意味着什么?这甚至没有意义海事组织。
@David 如果一个线程必须等待其他人初始化事情怎么办?例如,1 个阅读器线程等待 4 个线程,我会使用 -3 调用信号量构造函数,以使阅读器线程等待,直到所有其他线程都发帖。我想还有其他方法可以做到这一点,但这不合理吗?我认为这实际上是 OP 提出的问题,但有更多“thread1”。
@RichardHodges 非常迂腐,将无符号整数类型递减到 0 以下不是 UB。【参考方案4】:
根据 posix 信号量,我会添加
class semaphore
...
bool trywait()
boost::mutex::scoped_lock lock(mutex_);
if(count_)
--count_;
return true;
else
return false;
;
而且我更喜欢在方便的抽象级别上使用同步机制,而不是总是使用更基本的运算符复制粘贴拼接在一起的版本。
【讨论】:
【参考方案5】:C++20 终于有了信号量 - std::counting_semaphore<max_count>
。
这些有(至少)以下方法:
acquire()
(阻止)
try_acquire()
(非阻塞,立即返回)
try_acquire_for()
(非阻塞,需要一段时间)
try_acquire_until()
(非阻塞,需要一段时间才能停止尝试)
release()
您可以阅读these CppCon 2019 presentation slides,或观看video。还有官方的提案P0514R4,不过可能跟实际的C++20不同步。
【讨论】:
@Sandburg:据我所知,确实如此。 好的,是的include <semaphore>
很难找到...在这个主题上有很多“提升”噪音。
@Sandburg:我确实发布了一个链接。您还可以查看有关 C++20 的 Wikipedia 页面。
我在任何地方都找不到信号量头文件。有什么建议吗?
@VishaalSelvaraj: 1. 搜索库包文件。 2.使用搜索引擎检查其他人是否遇到过这个问题。在这里问另一个关于 SO 的问题,并确保包括所有细节(操作系统发行版、编译器、标准库等)【参考方案6】:
您还可以查看cpp11-on-multicore - 它具有可移植且最佳的信号量实现。
存储库还包含补充 c++11 线程的其他线程好东西。
【讨论】:
【参考方案7】:您可以使用互斥锁和条件变量。您获得互斥锁的独占访问权限,检查您是要继续还是需要等待另一端。如果你需要等待,你就在一个条件下等待。当其他线程确定您可以继续时,它会发出条件信号。
boost::thread 库中有一个简短的example,您很可能只需要复制它(C++0x 和 boost 线程库非常相似)。
【讨论】:
条件信号是否只发送给等待线程?所以如果thread0在thread1发出信号时没有等待它稍后会被阻塞?另外:我不需要条件附带的额外锁 - 这是开销。 是的,条件仅表示等待线程。常见的模式是有一个带有状态的变量和一个条件,以防您需要等待。想想生产者/消费者,缓冲区中的项目会有一个计数,生产者锁定,添加元素,增加计数和信号。消费者锁定,检查计数器,如果非零消费,如果零则等待条件。 你可以这样模拟一个信号量:用你给信号量的值初始化一个变量,然后wait()
被翻译成“锁定,检查计数是否非零递减并继续;如果零等待条件”,而 post
将是“锁定,递增计数器,如果为 0 则发出信号”
是的,听起来不错。我想知道posix信号量是否以相同的方式实现。
@tauran:我不确定(这可能取决于哪个 Posix 操作系统),但我认为不太可能。信号量传统上是比互斥锁和条件变量“低级”的同步原语,原则上可以比在 condvar 之上实现更有效。因此,在给定的操作系统中,所有用户级同步原语更有可能建立在与调度程序交互的一些常用工具之上。【参考方案8】:
也可以是线程中有用的 RAII 信号量包装器:
class ScopedSemaphore
public:
explicit ScopedSemaphore(Semaphore& sem) : m_Semaphore(sem) m_Semaphore.Wait();
ScopedSemaphore(const ScopedSemaphore&) = delete;
~ScopedSemaphore() m_Semaphore.Notify();
ScopedSemaphore& operator=(const ScopedSemaphore&) = delete;
private:
Semaphore& m_Semaphore;
;
多线程应用中的使用示例:
boost::ptr_vector<std::thread> threads;
Semaphore semaphore;
for (...)
...
auto t = new std::thread([..., &semaphore]
ScopedSemaphore scopedSemaphore(semaphore);
...
);
threads.push_back(t);
for (auto& t : threads)
t.join();
【讨论】:
【参考方案9】:我发现 shared_ptr 和 weak_ptr,一个很长的列表,完成了我需要的工作。我的问题是,我有几个客户想要与主机的内部数据进行交互。通常,主机自己更新数据,但是,如果客户端请求它,主机需要停止更新,直到没有客户端访问主机数据。同时,客户端可以请求独占访问,这样其他客户端和主机都不能修改该主机数据。
我是怎么做到的,我创建了一个结构:
struct UpdateLock
typedef std::shared_ptr< UpdateLock > ptr;
;
每个客户都有这样的成员:
UpdateLock::ptr m_myLock;
那么主机将有一个weak_ptr 成员用于排他性,以及一个weak_ptr 列表用于非排他性锁:
std::weak_ptr< UpdateLock > m_exclusiveLock;
std::list< std::weak_ptr< UpdateLock > > m_locks;
有一个函数可以启用锁定,还有一个函数可以检查主机是否被锁定:
UpdateLock::ptr LockUpdate( bool exclusive );
bool IsUpdateLocked( bool exclusive ) const;
我在 LockUpdate、IsUpdateLocked 中测试锁,并定期在主机的更新例程中测试。测试锁就像检查weak_ptr是否过期一样简单,然后从m_locks列表中删除任何过期的东西(我只在主机更新期间这样做),我可以检查列表是否为空;同时,当客户端重置他们挂起的 shared_ptr 时,我会自动解锁,当客户端自动销毁时也会发生这种情况。
总体效果是,由于客户端很少需要排他性(通常仅保留用于添加和删除),大多数时候对 LockUpdate(false) 的请求,即非排他性,只要 (! m_exclusiveLock)。而 LockUpdate( true ),一个排他性请求,只有在 (!m_exclusiveLock) 和 (m_locks.empty()) 都成功时才会成功。
可以添加一个队列来缓解排他锁和非排他锁之间的问题,但是,到目前为止我还没有发生过冲突,所以我打算等到发生这种情况后再添加解决方案(主要是为了进行实际测试条件)。
到目前为止,这可以很好地满足我的需求;我可以想象扩展它的需要,以及扩展使用可能会出现的一些问题,但是,这实现起来很快,并且只需要很少的自定义代码。
【讨论】:
【参考方案10】:与其他答案不同,我提出一个新版本:
-
在被删除之前取消阻塞所有等待的线程。在这种情况下,删除信号量会唤醒所有等待的线程,只有在每个人都醒来后,信号量的析构函数才会退出。
有一个
wait()
调用的参数,用于在超时(以毫秒为单位)过去后自动解锁调用线程。
在构造函数上有一个选项,可将可用资源计数限制为信号量初始化的计数。这样,调用notify()
的次数过多不会增加信号量的资源数量。
#include <stdio.h>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <iostream>
std::recursive_mutex g_sync_mutex;
#define sync(x) do \
std::unique_lock<std::recursive_mutex> lock(g_sync_mutex); \
x; \
while (false);
class Semaphore
int _count;
bool _limit;
int _all_resources;
int _wakedup;
std::mutex _mutex;
std::condition_variable_any _condition_variable;
public:
/**
* count - how many resources this semaphore holds
* limit - limit notify() calls only up to the count value (available resources)
*/
Semaphore (int count, bool limit)
: _count(count),
_limit(limit),
_all_resources(count),
_wakedup(count)
/**
* Unlock all waiting threads before destructing the semaphore (to avoid their segfalt later)
*/
virtual ~Semaphore ()
std::unique_lock<std::mutex> lock(_mutex);
_wakeup(lock);
void _wakeup(std::unique_lock<std::mutex>& lock)
int lastwakeup = 0;
while( _wakedup < _all_resources )
lock.unlock();
notify();
lock.lock();
// avoids 100% CPU usage if someone is not waking up properly
if (lastwakeup == _wakedup)
std::this_thread::sleep_for( std::chrono::milliseconds(10) );
lastwakeup = _wakedup;
// Mutex and condition variables are not movable and there is no need for smart pointers yet
Semaphore(const Semaphore&) = delete;
Semaphore& operator =(const Semaphore&) = delete;
Semaphore(const Semaphore&&) = delete;
Semaphore& operator =(const Semaphore&&) = delete;
/**
* Release one acquired resource.
*/
void notify()
std::unique_lock<std::mutex> lock(_mutex);
// sync(std::cerr << getTime() << "Calling notify(" << _count << ", " << _limit << ", " << _all_resources << ")" << std::endl);
_count++;
if (_limit && _count > _all_resources)
_count = _all_resources;
_condition_variable.notify_one();
/**
* This function never blocks!
* Return false if it would block when acquiring the lock. Otherwise acquires the lock and return true.
*/
bool try_acquire()
std::unique_lock<std::mutex> lock(_mutex);
// sync(std::cerr << getTime() << "Calling try_acquire(" << _count << ", " << _limit << ", " << _all_resources << ")" << std::endl);
if(_count <= 0)
return false;
_count--;
return true;
/**
* Return true if the timeout expired, otherwise return false.
* timeout - how many milliseconds to wait before automatically unlocking the wait() call.
*/
bool wait(int timeout = 0)
std::unique_lock<std::mutex> lock(_mutex);
// sync(std::cerr << getTime() << "Calling wait(" << _count << ", " << _limit << ", " << _all_resources << ")" << std::endl);
_count--;
_wakedup--;
try
std::chrono::time_point<std::chrono::system_clock> timenow = std::chrono::system_clock::now();
while(_count < 0)
if (timeout < 1)
_condition_variable.wait(lock);
else
std::cv_status status = _condition_variable.wait_until(lock, timenow + std::chrono::milliseconds(timeout));
if ( std::cv_status::timeout == status)
_count++;
_wakedup++;
return true;
catch (...)
_count++;
_wakedup++;
throw;
_wakedup++;
return false;
/**
* Return true if calling wait() will block the calling thread
*/
bool locked()
std::unique_lock<std::mutex> lock(_mutex);
return _count <= 0;
/**
* Return true the semaphore has at least all resources available (since when it was created)
*/
bool freed()
std::unique_lock<std::mutex> lock(_mutex);
return _count >= _all_resources;
/**
* Return how many resources are available:
* - 0 means not free resources and calling wait() will block te calling thread
* - a negative value means there are several threads being blocked
* - a positive value means there are no threads waiting
*/
int count()
std::unique_lock<std::mutex> lock(_mutex);
return _count;
/**
* Wake everybody who is waiting and reset the semaphore to its initial value.
*/
void reset()
std::unique_lock<std::mutex> lock(_mutex);
if(_count < 0)
_wakeup(lock);
_count = _all_resources;
;
打印当前时间戳的实用程序:
std::string getTime()
char buffer[20];
#if defined( WIN32 )
SYSTEMTIME wlocaltime;
GetLocalTime(&wlocaltime);
::snprintf(buffer, sizeof buffer, "%02d:%02d:%02d.%03d ", wlocaltime.wHour, wlocaltime.wMinute, wlocaltime.wSecond, wlocaltime.wMilliseconds);
#else
std::chrono::time_point< std::chrono::system_clock > now = std::chrono::system_clock::now();
auto duration = now.time_since_epoch();
auto hours = std::chrono::duration_cast< std::chrono::hours >( duration );
duration -= hours;
auto minutes = std::chrono::duration_cast< std::chrono::minutes >( duration );
duration -= minutes;
auto seconds = std::chrono::duration_cast< std::chrono::seconds >( duration );
duration -= seconds;
auto milliseconds = std::chrono::duration_cast< std::chrono::milliseconds >( duration );
duration -= milliseconds;
time_t theTime = time( NULL );
struct tm* aTime = localtime( &theTime );
::snprintf(buffer, sizeof buffer, "%02d:%02d:%02d.%03ld ", aTime->tm_hour, aTime->tm_min, aTime->tm_sec, milliseconds.count());
#endif
return buffer;
使用此信号量的示例程序:
// g++ -o test -Wall -Wextra -ggdb -g3 -pthread test.cpp && gdb --args ./test
// valgrind --leak-check=full --show-leak-kinds=all --track-origins=yes --verbose ./test
// procdump -accepteula -ma -e -f "" -x c:\ myexe.exe
int main(int argc, char* argv[])
std::cerr << getTime() << "Creating Semaphore" << std::endl;
Semaphore* semaphore = new Semaphore(1, false);
semaphore->wait(1000);
semaphore->wait(1000);
std::cerr << getTime() << "Auto Unlocking Semaphore wait" << std::endl;
std::this_thread::sleep_for( std::chrono::milliseconds(5000) );
delete semaphore;
std::cerr << getTime() << "Exiting after 10 seconds..." << std::endl;
return 0;
示例输出:
11:03:01.012 Creating Semaphore
11:03:02.012 Auto Unlocking Semaphore wait
11:03:07.012 Exiting after 10 seconds...
使用EventLoop 在一段时间后解锁信号量的额外功能:
std::shared_ptr<std::atomic<bool>> autowait(Semaphore* semaphore, int timeout, EventLoop<std::function<void()>>& eventloop, const char* source)
std::shared_ptr<std::atomic<bool>> waiting(std::make_shared<std::atomic<bool>>(true));
sync(std::cerr << getTime() << "autowait '" << source << "'..." << std::endl);
if (semaphore->try_acquire())
eventloop.enqueue( timeout, [waiting, source, semaphore]
if ( (*waiting).load() )
sync(std::cerr << getTime() << "Timeout '" << source << "'..." << std::endl);
semaphore->notify();
);
else
semaphore->wait(timeout);
return waiting;
Semaphore semaphore(1, false);
EventLoop<std::function<void()>>* eventloop = new EventLoop<std::function<void()>>(true);
std::shared_ptr<std::atomic<bool>> waiting_something = autowait(&semaphore, 45000, eventloop, "waiting_something");
【讨论】:
【参考方案11】:如果有人对原子版本感兴趣,这里是实现。性能预计优于互斥体和条件变量版本。
class semaphore_atomic
public:
void notify()
count_.fetch_add(1, std::memory_order_release);
void wait()
while (true)
int count = count_.load(std::memory_order_relaxed);
if (count > 0)
if (count_.compare_exchange_weak(count, count-1, std::memory_order_acq_rel, std::memory_order_relaxed))
break;
bool try_wait()
int count = count_.load(std::memory_order_relaxed);
if (count > 0)
if (count_.compare_exchange_strong(count, count-1, std::memory_order_acq_rel, std::memory_order_relaxed))
return true;
return false;
private:
std::atomic_int count_0;
;
【讨论】:
我预计性能会很多更糟。这段代码几乎犯了所有可能的错误。作为最明显的例子,假设wait
代码必须循环多次。当它最终解除阻塞时,它将采用所有错误预测分支的母亲,因为 CPU 的循环预测肯定会预测它会再次循环。我可以用这段代码列出更多问题。
这是另一个明显的性能杀手:wait
循环将在其旋转时消耗 CPU 微执行资源。假设它与应该notify
它的线程在同一个物理内核中——它会大大减慢线程的速度。
这里还有一个:在 x86 CPU(当今最流行的 CPU)上,compare_exchange_weak 操作始终是写入操作,即使它失败(如果比较,它会写回读取的相同值)失败)。所以假设两个核心都在同一个信号量的wait
循环中。它们都在全速写入 same 缓存线,这会通过使内核间总线饱和而使其他内核慢下来。
我只是更仔细地查看了您的代码。它执行读取循环而不是写入循环。所以不存在wait
中阻塞的两个线程导致内核间资源饱和的问题。所以它不会犯几乎所有可能的错误——这是一个你没有犯的大而常见的错误。
没有无锁信号量这种东西。无锁的整个想法不是在不使用互斥锁的情况下编写代码,而是编写线程从不阻塞的代码。在这种情况下,信号量的本质是阻塞调用 wait() 函数的线程!以上是关于C++0x 没有信号量?如何同步线程?的主要内容,如果未能解决你的问题,请参考以下文章