[C/C++20]_[初级]_[信号量semaphore]
Posted infoworld
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[C/C++20]_[初级]_[信号量semaphore]相关的知识,希望对你有一定的参考价值。
场景
- 在
C++20
标准里添加了一个关于线程相关的类:Semaphores
, 它在<semaphore>
头文件里定义了,那么这个信号类如何使用?
说明
- cppreference 里对信号量的描述:
是一种轻量级同步原语,用于约束对共享资源的并发访问。当两者都满足时,信号量可能比条件变量更有效。
- 在
开发工具对C++标准的支持版本
[1]里要支持C++20
特性要在gcc 版本9
或vs2019
以上才支持。而这个信号在Win32
3里其实早就存在。而在*ux
下系统也有关于信号的C
库<semaphore>
. 可见在C++20
支持的比较晚, 这个特性相比互斥量用的比较少.
Win32
// Create a semaphore with initial and max counts of MAX_SEM_COUNT
ghSemaphore = CreateSemaphore(
NULL, // default security attributes
MAX_SEM_COUNT, // initial count
MAX_SEM_COUNT, // maximum count
NULL); // unnamed semaphore
Linux
int sem_init (sem_t *sem, int pshared, unsigned int value);
int sem_destroy (sem_t *sem);
sem_t *sem_open (const char *name, int oflag, ...);
int sem_close (sem_t *sem);
...
- 在
C++20
,对于信号的支持就增加了一个类, 重要方法就两个release
和acquire
. 信号是用原子计数器来控制工作线程并发访问共享资源。信号释放release
一个任务, 工作线程就获取一个执行任务。直到任务个数为0,调用acquire
的工作线程就会进入等待状态.
template<std::ptrdiff_t LeastMaxValue = /* implementation-defined */>
class counting_semaphore;
using binary_semaphore = std::counting_semaphore<1>;
// 重要方法
void release( std::ptrdiff_t update = 1 );
void acquire();
-
semaphore
的特性:- 可以指定确定个数的任务同时被执行。通过
release(num)
. - 任务被执行时,可执行的个数会自动减一.
acrequire
递减1
- 和
mutex,condition_variable
不同,mutex
需要先进入wait
再收到通知才会继续执行,意味着condition_variable
如果先执行,那么mutex
会错过唤醒。 - 线程之间的通讯,线程A准备好资源后会自动通知线程B,线程B就会立即执行。
- semophore信号的唤醒实现使用的是轻量级的原子实现,没有用到mutex。
- 可以指定确定个数的任务同时被执行。通过
-
semaphore
的使用场景:- 多线程同时并发执行任务.
- 用在线程池的线程任务派发到线程。
例子
- 项目例子除了
cppreference
给的binary_semaphore
之外,增加了一个多线程抢占式执行指定个数任务的情况。每次都有两个线程并发执行任务。
test-semaphore.cpp
#include <iostream>
#include <thread>
#include <vector>
#include <chrono>
#include <functional>
#include <semaphore>
#include "utils.h"
#include "pthread.h"
using namespace std;
// global binary semaphore instances
// object counts are set to zero
// objects are in non-signaled state
binary_semaphore
smphSignalMainToThread(0),
smphSignalThreadToMain(0);
//1. 最多支持5个线程
#define MAX_THREAD_COUNT 5
counting_semaphore<MAX_THREAD_COUNT> gThreadFactory(0);
bool gExistThread = false;
std::mutex gMutex;
template<class T,class V>
void LogThreadSafety(T t,V v){
gMutex.lock();
log(t,v);
gMutex.unlock();
}
void ThreadProc()
{
// wait for a signal from the main proc
// by attempting to decrement the semaphore
smphSignalMainToThread.acquire();
// this call blocks until the semaphore's count
// is increased from the main proc
cout << "[thread] Got the signal\\n"; // response message
// wait for 3 seconds to imitate some work
// being done by the thread
using namespace literals;
this_thread::sleep_for(3s);
cout << "[thread] Send the signal\\n"; // message
// signal the main proc back
smphSignalThreadToMain.release();
}
void TestBinarySemaphore()
{
// create some worker thread
thread thrWorker(ThreadProc);
cout << "[main] Send the signal\\n"; // message
// signal the worker thread to start working
// by increasing the semaphore's count
smphSignalMainToThread.release();
// wait until the worker thread is done doing the work
// by attempting to decrement the semaphore's count
smphSignalThreadToMain.acquire();
cout << "[main] Got the signal\\n"; // response message
thrWorker.join();
}
void DoTaskWork(int threadId)
{
while (true){
LogThreadSafety(threadId,"WAIT");
gThreadFactory.acquire();
if(gExistThread)
break;
// 1。假设执行业务逻辑需要3秒
LogThreadSafety(threadId,"BEGIN");
this_thread::sleep_for(3s);
LogThreadSafety(threadId,"END");
}
}
void TestCountingSemaphore()
{
std::vector<std::thread> vt;
for(auto i = 0; i < MAX_THREAD_COUNT; ++i)
vt.push_back(move(thread(bind(&DoTaskWork,i))));
int count = 10;
while (count--){
gThreadFactory.release(2);
this_thread::sleep_for(5s);
}
gExistThread = true;
gThreadFactory.release(MAX_THREAD_COUNT);
for(auto i = 0; i < MAX_THREAD_COUNT; ++i)
vt[i].join();
log("Max",gThreadFactory.max());
}
int main()
{
TestBinarySemaphore();
TestCountingSemaphore();
}
utils.h
#ifndef UTILS_H
#define UTILS_H
#include <iostream>
#include <atomic>
#include <typeinfo>
template<typename K,typename T>
void log(K k,T t,const char* newLine = "\\n"){
std::cout << k << "=";
std::cout << "[";
if (std::is_same<bool, T>().value)
std::cout << std::boolalpha;
std::cout << t;
std::cout << "]" << newLine;
}
template<typename T>
void print(T t){
std::cout << t << std::endl;
}
void println(){
std::cout << std::endl;
}
template<typename K,typename T>
void logHex(K k,T t){
std::cout << k << "=" << std::hex << t << std::endl;
}
template<typename M>
void logMap(M m){
for(auto& [k,t] : m){
log(k,t);
}
}
template<typename M>
void logSet(M m){
for(auto& one : m){
print(one);
}
}
#endif
输出
[main] Send the signal
[thread] Got the signal
[thread] Send the signal
[main] Got the signal
0=[WAIT]
1=[WAIT]
2=[WAIT]
3=[WAIT]
4=[WAIT]
0=[BEGIN]
1=[BEGIN]
0=[END]
0=[WAIT]
1=[END]
1=[WAIT]
0=[BEGIN]
4=[BEGIN]
0=[END]
4=[END]
0=[WAIT]
4=[WAIT]
2=[BEGIN]
1=[BEGIN]
2=[END]
1=[END]
2=[WAIT]
1=[WAIT]
1=[BEGIN]
0=[BEGIN]
1=[END]
1=[WAIT]
0=[END]
0=[WAIT]
1=[BEGIN]
2=[BEGIN]
1=[END]
1=[WAIT]
2=[END]
2=[WAIT]
2=[BEGIN]
0=[BEGIN]
2=[END]
2=[WAIT]
0=[END]
0=[WAIT]
0=[BEGIN]
2=[BEGIN]
0=[END]
0=[WAIT]
2=[END]
2=[WAIT]
0=[BEGIN]
4=[BEGIN]
0=[END]
0=[WAIT]
4=[END]
4=[WAIT]
1=[BEGIN]
2=[BEGIN]
1=[END]
2=[END]
1=[WAIT]
2=[WAIT]
2=[BEGIN]
1=[BEGIN]
2=[END]
2=[WAIT]
1=[END]
1=[WAIT]
Max=[5]
参考
以上是关于[C/C++20]_[初级]_[信号量semaphore]的主要内容,如果未能解决你的问题,请参考以下文章
[C/C++11]_[初级]_[使用正则表达式库regex]
[C/C++]_[初级]_[声明和使用字符串常量和字节常量]
[C/C++11]_[初级]_[获取CPU支持的最合适的核心线程数]
[C/C++11]_[初级]_[获取CPU支持的最合适的核心线程数]