[C/C++20]_[初级]_[信号量semaphore]

Posted infoworld

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[C/C++20]_[初级]_[信号量semaphore]相关的知识,希望对你有一定的参考价值。

场景

  1. C++20标准里添加了一个关于线程相关的类:Semaphores, 它在<semaphore>头文件里定义了,那么这个信号类如何使用?

说明

  1. cppreference 里对信号量的描述:
是一种轻量级同步原语,用于约束对共享资源的并发访问。当两者都满足时,信号量可能比条件变量更有效。
  1. 开发工具对C++标准的支持版本[1]里要支持C++20特性要在gcc 版本9vs2019以上才支持。而这个信号在Win323里其实早就存在。而在*ux下系统也有关于信号的C<semaphore>. 可见在C++20支持的比较晚, 这个特性相比互斥量用的比较少.

Win32

 // Create a semaphore with initial and max counts of MAX_SEM_COUNT

    ghSemaphore = CreateSemaphore( 
        NULL,           // default security attributes
        MAX_SEM_COUNT,  // initial count
        MAX_SEM_COUNT,  // maximum count
        NULL);          // unnamed semaphore

Linux

  int sem_init (sem_t *sem, int pshared, unsigned int value);
  int sem_destroy (sem_t *sem);
  sem_t *sem_open (const char *name, int oflag, ...);
  int sem_close (sem_t *sem);
  ...
  1. C++20,对于信号的支持就增加了一个类, 重要方法就两个releaseacquire. 信号是用原子计数器来控制工作线程并发访问共享资源。信号释放release一个任务, 工作线程就获取一个执行任务。直到任务个数为0,调用acquire的工作线程就会进入等待状态.
template<std::ptrdiff_t LeastMaxValue = /* implementation-defined */>
class counting_semaphore;
using binary_semaphore = std::counting_semaphore<1>;

// 重要方法
void release( std::ptrdiff_t update = 1 );
void acquire();
  1. semaphore的特性:

    • 可以指定确定个数的任务同时被执行。通过release(num).
    • 任务被执行时,可执行的个数会自动减一. acrequire递减1
    • mutex,condition_variable不同, mutex需要先进入wait再收到通知才会继续执行,意味着condition_variable 如果先执行,那么mutex会错过唤醒。
    • 线程之间的通讯,线程A准备好资源后会自动通知线程B,线程B就会立即执行。
    • semophore信号的唤醒实现使用的是轻量级的原子实现,没有用到mutex。
  2. semaphore的使用场景:

    • 多线程同时并发执行任务.
    • 用在线程池的线程任务派发到线程。

例子

  1. 项目例子除了cppreference给的binary_semaphore之外,增加了一个多线程抢占式执行指定个数任务的情况。每次都有两个线程并发执行任务。

test-semaphore.cpp

#include <iostream>
#include <thread>
#include <vector>
#include <chrono>
#include <functional>
#include <semaphore>
#include "utils.h"
#include "pthread.h"

using namespace std;
 
// global binary semaphore instances
// object counts are set to zero
// objects are in non-signaled state
binary_semaphore
	smphSignalMainToThread(0),
	smphSignalThreadToMain(0);

//1. 最多支持5个线程
#define MAX_THREAD_COUNT 5
counting_semaphore<MAX_THREAD_COUNT> gThreadFactory(0);
bool gExistThread = false;
std::mutex gMutex;

template<class T,class V>
void LogThreadSafety(T t,V v){
    gMutex.lock();
    log(t,v);
    gMutex.unlock();
}

void ThreadProc()
{	
	// wait for a signal from the main proc
	// by attempting to decrement the semaphore
	smphSignalMainToThread.acquire();
 
	// this call blocks until the semaphore's count
	// is increased from the main proc
 
	cout << "[thread] Got the signal\\n"; // response message
 
	// wait for 3 seconds to imitate some work
	// being done by the thread
	using namespace literals;
	this_thread::sleep_for(3s);
 
	cout << "[thread] Send the signal\\n"; // message
 
	// signal the main proc back
	smphSignalThreadToMain.release();
}

void TestBinarySemaphore()
{
    // create some worker thread
	thread thrWorker(ThreadProc);
 
	cout << "[main] Send the signal\\n"; // message
 
	// signal the worker thread to start working
	// by increasing the semaphore's count
	smphSignalMainToThread.release();

	// wait until the worker thread is done doing the work
	// by attempting to decrement the semaphore's count
	smphSignalThreadToMain.acquire();
 
	cout << "[main] Got the signal\\n"; // response message
	thrWorker.join();
}
 
void DoTaskWork(int threadId)
{
    while (true){
        LogThreadSafety(threadId,"WAIT");
        gThreadFactory.acquire();
        if(gExistThread)
            break;

        // 1。假设执行业务逻辑需要3秒
        LogThreadSafety(threadId,"BEGIN");
        this_thread::sleep_for(3s);
        LogThreadSafety(threadId,"END");
    }
}

void TestCountingSemaphore()
{
    std::vector<std::thread> vt;
    for(auto i  = 0; i < MAX_THREAD_COUNT; ++i)
        vt.push_back(move(thread(bind(&DoTaskWork,i))));

    int count = 10;
    while (count--){
        gThreadFactory.release(2);
        this_thread::sleep_for(5s);
    }
    
    gExistThread = true;
    gThreadFactory.release(MAX_THREAD_COUNT);
    for(auto i  = 0; i < MAX_THREAD_COUNT; ++i)
        vt[i].join();
 
    log("Max",gThreadFactory.max());
}

int main()
{
	TestBinarySemaphore();
    TestCountingSemaphore();
}

utils.h

#ifndef UTILS_H
#define UTILS_H

#include <iostream>
#include <atomic>
#include <typeinfo>

template<typename K,typename T>
void log(K k,T t,const char* newLine = "\\n"){

	std::cout << k << "=";
    std::cout << "[";
	if (std::is_same<bool, T>().value)
		std::cout << std::boolalpha;
	
	std::cout << t;
    std::cout << "]" << newLine;
}

template<typename T>
void print(T t){
    std::cout << t << std::endl;
}

void println(){
    std::cout << std::endl;
}

template<typename K,typename T>
void logHex(K k,T t){
    std::cout << k << "=" << std::hex <<  t << std::endl;
}

template<typename M>
void logMap(M m){
    for(auto& [k,t] : m){
        log(k,t);
    }
}

template<typename M>
void logSet(M m){
    for(auto& one : m){
        print(one);
    }
}

#endif 

输出

[main] Send the signal
[thread] Got the signal
[thread] Send the signal
[main] Got the signal
0=[WAIT]
1=[WAIT]
2=[WAIT]
3=[WAIT]
4=[WAIT]
0=[BEGIN]
1=[BEGIN]
0=[END]
0=[WAIT]
1=[END]
1=[WAIT]
0=[BEGIN]
4=[BEGIN]
0=[END]
4=[END]
0=[WAIT]
4=[WAIT]
2=[BEGIN]
1=[BEGIN]
2=[END]
1=[END]
2=[WAIT]
1=[WAIT]
1=[BEGIN]
0=[BEGIN]
1=[END]
1=[WAIT]
0=[END]
0=[WAIT]
1=[BEGIN]
2=[BEGIN]
1=[END]
1=[WAIT]
2=[END]
2=[WAIT]
2=[BEGIN]
0=[BEGIN]
2=[END]
2=[WAIT]
0=[END]
0=[WAIT]
0=[BEGIN]
2=[BEGIN]
0=[END]
0=[WAIT]
2=[END]
2=[WAIT]
0=[BEGIN]
4=[BEGIN]
0=[END]
0=[WAIT]
4=[END]
4=[WAIT]
1=[BEGIN]
2=[BEGIN]
1=[END]
2=[END]
1=[WAIT]
2=[WAIT]
2=[BEGIN]
1=[BEGIN]
2=[END]
2=[WAIT]
1=[END]
1=[WAIT]
Max=[5]

参考

  1. 开发工具对C++标准的支持版本

  2. cppreference

  3. Win32 using-semaphore-objects

以上是关于[C/C++20]_[初级]_[信号量semaphore]的主要内容,如果未能解决你的问题,请参考以下文章

[C/C++11]_[初级]_[使用正则表达式库regex]

[C/C++]_[初级]_[声明和使用字符串常量和字节常量]

[C/C++11]_[初级]_[获取CPU支持的最合适的核心线程数]

[C/C++11]_[初级]_[获取CPU支持的最合适的核心线程数]

[C/C++11]_[初级]_[获取CPU支持的最合适的核心线程数]

[C/C++11]_[初级]_[获取CPU支持的最合适的核心线程数]