Linux线程池 | 线程安全的单例模式 | STL智能指针与线程安全 | 读者写者问题

Posted 阿亮joy.

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Linux线程池 | 线程安全的单例模式 | STL智能指针与线程安全 | 读者写者问题相关的知识,希望对你有一定的参考价值。

​🌠 作者:@阿亮joy.
🎆专栏:《学会Linux》
🎇 座右铭:每个优秀的人都有一段沉默的时光,那段时光是付出了很多努力却得不到结果的日子,我们把它叫做扎根

目录

👉线程池👈

池化技术是一种资源预分配的机制,先将资源申请好,如果用户需要资源,直接就可以将资源交给用户,不需要再去向系统申请,以提高效率。

什么是线程池

线程池:一种线程的使用模式(以空间换时间的形式)。线程过多会带来调度开销,进而影响缓存局部性和整体性能。线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。

线程池的优点

线程池维护着多个线程,等待着监督管理者分配可并发执行的任务,这就避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络 sockets 等的数量。

线程池的应用场景:

  • 需要大量的线程来完成任务,且完成任务的时间比较短。 Web 服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个 Telnet 连接请求,线程池的优点就不明显了。因为 Telnet 会话时间比线程的创建时间大多了。
  • 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
  • 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误。

线程池的实现

线程池中包含一个任务队列和多个线程,主线程向任务队列中 Push 任务,而多个线程则将任务队列中的任务 Pop 出来,然后去执行任务。

LockGuard.hpp

  • Mutex 类中封装了外部传入的锁,其 Lock 接口是对传入的锁进行加锁,Unlock 接口是对传入的锁进行解锁。
  • lockGuard 类是以 RAII 的思想来设计了一个锁的守卫的类,该类对象进行构造时会自动进行加锁,进析构时会自动进行解锁,不需要手动进行加锁和解锁,只需要创建 lockGuard 类对象即可。
#pragma once

#include <pthread.h>

class Mutex

public:
    Mutex(pthread_mutex_t* pmtx)
        : _pmtx(pmtx)
    

    ~Mutex()
    

    void Lock()
    
        pthread_mutex_lock(_pmtx);
    

    void Unlock()
    
        pthread_mutex_unlock(_pmtx);
    

private:
    pthread_mutex_t* _pmtx;
;

// RAII风格的加锁方式
// 构造时自动进行加锁
// 析构时自动进行解锁
class lockGuard

public:
    lockGuard(pthread_mutex_t* pmtx)
        : _mtx(pmtx)
    
        _mtx.Lock();
    

    ~lockGuard()
    
        _mtx.Unlock();
    

private:
    Mutex _mtx;
;

Thread.hpp

  • Thread 类对线程进行了封装,其对象构造是需要传入线程的编号 num,线程的执行例程(回调函数)callBack 和执行参数 args。
  • 同时还设计了线程数据 ThreadData,ThreadData 中包含线程执行例程的参数、线程名和线程 ID。如果还需要其他的数据,可以在 ThreadData 中添加。
  • Thread 类主要实现了两个接口 Create 和 Join,Create 是创建一个新的线程,而 Join 是等待线程,将线程占用的资源归还。
#pragma once

#include <string>
#include <pthread.h>
#include <cstdio>

typedef void*(*func_t)(void*);

class ThreadData

public:
    void *_args;        // 线程执行例程的参数
    std::string _name;  // 线程名
    pthread_t _tid;     // 线程ID
;

class Thread

public:
    Thread(int num, func_t callBack, void* args)
        : _func(callBack)
    
        char nameBuffer[64];
        snprintf(nameBuffer, sizeof nameBuffer, "Thread %d", num);
        _data._args = args;
        _data._name = nameBuffer;
    

    // 创建线程
    void Create()
    
        pthread_create(&_data._tid, nullptr, _func, (void*)&_data);
    

    // 等待线程
    void Join()
    
        pthread_join(_data._tid, nullptr);
    

    // 返回线程的名字
    std::string Name()
    
        return _data._name;
    

    ~Thread()
    

private:
    func_t _func;       // 线程的执行例程
    ThreadData _data;   // 线程的属性
;

Task.hpp

#pragma once

#include <iostream>

// 任务类
class Task

public:
    Task() = default;

    Task(int x, int y, char op)
        : _x(x)
        , _y(y)
        , _op(op)
    

    ~Task()
    

    // 处理任务
    void Execute(const std::string ThreadName)
    
        int ret = 0;
        switch(_op)
        
        case '+':
            ret = _x + _y;
            break;
        case '-':
            ret = _x - _y;
            break;
        case '*':
            ret = _x * _y;
            break;
        case '/':
            if(_y == 0)
            
                std::cerr << "Error: Divided By Zero!" << std::endl;
                return;
            
            else
            
                ret = _x / _y;
            
            break;
        case '%':
            if(_y == 0)
            
                std::cerr << "Error: Modular Division By Zero!" << std::endl;
            
            else
            
                ret = _x % _y;
            
            break;
        default:
            std::cerr << "Operation Error!" << std::endl;
            return;
        
        std::cout << "[" << ThreadName << "]: " << _x << " " << _op << " " << _y << " = " << ret << std::endl;
    

private:
    int _x;
    int _y;
    char _op;
;

ThreadPool.hpp

  • 线程池中的任务队列是被多个执行流访问的临界资源,所以我们需要引入互斥锁来保护任务队列。
  • 线程池中的线程是要执行主线程 Push 到任务队列中的任务,如果任务队列中没有任务,那么线程只能进行等待,直至任务队列中有任务。
  • 当某个线程被唤醒时,可能是被广播类的唤醒线程操作唤醒的。而被广播唤醒的若干个线程中,只有个别的线程拿到任务。所以当线程被唤醒时,还需要再次判断是否满足被唤醒的条件,所以应该使用 while 来进行判断,而不是 if,否则将会存在伪唤醒的情况。
  • pthread_cond_broadcast 函数的作用是唤醒所有在该条件变量下等待的线程。当提供给线程使用的资源可能不是很多,不应该唤醒使用 pthread_cond_broadcast 来唤醒全部线程,否则将会引起惊群效应影响效率。所以当资源较少时,应该使用 pthread_cond_signal 唤醒一个线程即可。

  • 线程从任务队列中获取到任务后,该任务就属于当前线程了,其他线程无法拿到该任务。因此任务的处理应该在临界区外,而不是在临界区内处理任务。如果将线程在临界区内处理任务,那么其他线程要等待该线程将任务处理完,才能够进入临界区获取任务,这样就无妨让多个线程并发地执行任务了,那么线程池的意义也不大了。
  • 线程池的构造函数主要做的工作是将互斥锁和条件变量初始化,还需要创建 num 个线程对象 Thread。而创建线程对象需要传入线程编号、线程要执行的历程以及执行例程的参数线程池指针 this。
  • 为什么执行例程 Routine 要加 static 修饰呢?因为类的成员函数的第一个参数默认就是 this 指针,如果 Routine 不加 static 修饰,那么 Routine 将会有两个参数 this 指针和 void* args。这无法满足创建线程时传入返回值为 void*,参数为 void*的函数指针的要求,所以 Routine 需要用 static 修饰。static 修饰的函数属于整个类,而不属于某个类对象,没有隐藏的 this 指针。
  • 为什么创建线程对象 Thread 要传入 this 指针作为执行例程 Routine 的参数呢?因为 Routine 是静态的成员函数,在其内部无法调用非静态成员函数。而我们需要在 Routine 中调用该类的非静态成员函数,如 Pop 获取任务。所以在创建线程需要传入 this 指针,那么在 Routine 内部就可以通过 this 指针来调用非静态成员函数了。
  • 该线程池是模板类,所以任务队列中存储的任务类型是任意的。但不管是何种任务,这些任务都需要有一个 Execute 函数接口,因为处理任务时需要调用该接口。
#pragma once

#include <iostream>
#include <vector>
#include <string>
#include <queue>
#include <unistd.h>
#include "LockGuard.hpp"
#include "Thread.hpp"
#include "Task.hpp"

const int g_thread_num = 3;

template <class T>
class ThreadPool

private:
    bool isEmpty() const
    
        return _task_queue.empty();
    

    void Wait()
    
        pthread_cond_wait(&_cond, &_lock);
    

    // 从任务队列中取任务
    void Pop(T &task)
    
        task = _task_queue.front();
        _task_queue.pop();
    

public:
    ThreadPool(int num = g_thread_num)
        : _num(num)
    
        pthread_mutex_init(&_lock, nullptr);
        pthread_cond_init(&_cond, nullptr);
        for (int i = 1; i <= _num; ++i)
        
            _threads.push_back(new Thread(i, Routine, this));
        
    

    ~ThreadPool()
    
        pthread_mutex_destroy(&_lock);
        pthread_cond_destroy(&_cond);
        for (auto &iter : _threads)
        
            iter->Join(); // 释放线程所占用的资源
            delete iter;  // 释放new出来的Thread对象
        
    

    // 启动线程池(创建若干个线程)
    void Run()
    
        for (auto &iter : _threads)
        
            iter->Create();
        
    

    // 线程的执行例程
    static void *Routine(void *args)
    
        ThreadData* td = (ThreadData*)args;
        ThreadPool<T>* self = (ThreadPool<T>*)td->_args;
        // 不断从任务队列中获取任务
        while (true)
        
            T task;
            
                lockGuard lockguard(&self->_lock);
                // 以下代码全是临界区
                while (self->isEmpty())
                
                    self->Wait();
                
                // 获取任务
                self->Pop(task);
            
            // 处理任务
            task.Execute(td->_name);
        
        return nullptr;
    

    // 往任务队列中塞任务
    void Push(const T &task)
    
        lockGuard lockguard(&_lock);
        _task_queue.push(task);
        pthread_cond_signal(&_cond);
    

private:
    std::vector<Thread *> _threads; // 保存创建好的线程
    int _num;                       // 线程池中线程的数量
    std::queue<T> _task_queue;      // 任务队列
    pthread_mutex_t _lock;  // 保护任务队列
    pthread_cond_t _cond;   // 确保任务队列中有任务
;

Test.cc

  • 主线程不断地向线程池中的任务队列 Push 任务,线程池的线程从任务队列中获取任务,执行 Execute 接口处理任务。目前我们没有任务的来源,所以我们只能通过生成随机数来模拟任务的来源。等我们学习了网络部分,以后的任务就可以从网络中来。
#include "ThreadPool.hpp"
#include <iostream>
#include <ctime>
#include <cstdlib>
#include <unistd.h>

int main()

    srand((unsigned int)time(nullptr));
    ThreadPool<Task>* tp = new ThreadPool<Task>();
    tp->Run();

    const char* options = "+-*/%";
    while(true)
    
        int x = rand() % 100;
        int y = rand() % 100;
        int index = rand() % 5;
        Task t(x, y, options[index]);
        // 将任务推送到线程池的任务队列中
        tp->Push(t);
        sleep(1);
    

    return 0;


进程启动后,就会有四个线程,分别是一个主线程和线程池中的三个线程。主线程每隔一秒向任务队列中 Push 一个任务,这三个线程只有一个线程能够获取该任务,其他线程都会在等待队列中等待。该线程处理完任务后就会因为任务队列为空而排在队列的尾部,主线程 Push 一个任务后,等待队列头部的线程将会获取到任务并处理,如此周而复始,那么线程处理任务就呈现出一定的顺序性了。

👉日志功能的实现👈

日志在项目开发中是非常重要的一个功能,它可以帮助我们快速地找出程序的错误等。一个完整的日志,至少有一下功能:日志等级、时间和支持用户自定义日志信息(日志内容、文件行、文件名)等。

时间可以通过 gettimeofday 和 localtime 等函数来获取,在这里就不进行讲解了,自行上网查询。如果想让用户可以自定义日志信息,就需要借助可变参数列表,主要有以下函数。大家自行查询,在这就不赘述了。


将可变参数列表格式化打印的函数,如下图所示:

#pragma once

#include <cstdio>
#include <cstdarg>
#include <string>
#include <iostream>
#include <ctime>

// 日志等级
#define DEBUG   0
#define NORMAL  1
#define WARNING 2
#define ERROR   3
#define FATAL   4

#define LOGFILE "./ThreadPool.log"

const char* levelMap[] = 

    "DEBUG",
    "NORMAL",
    "WARNING",
    "ERROR",
    "FATAL"
;

void logMessage(int level, const char* format, ...)

    // 只有定义了DEBUG_SHOW,才会打印debug信息
    // 利用命令行来定义即可,如-D DEBUG_SHOW
#ifndef DEBUG_SHOW
    if(level == DEBUG) return;
#endif

    char stdBuffer[1024];   // 标准部分
    time_t timestamp = time(nullptr);
    // struct tm *localtime = localtime(&timestamp);
    snprintf(stdBuffer, sizeof stdBuffer, "[%s] [%ld] ", levelMap[level], timestamp);

    char logBuffer[1024];   // 自定义部分
    va_list args;   // va_list就是char*的别名
    va_start(args, format); // va_start是宏函数,让args指向参数列表的第一个位置
    // vprintf(format, args); // 以format形式向显示器上打印参数列表
    vsnprintf(logBuffer, sizeof logBuffer, format, args);

    va_end(args);   // va_end将args弄成nullptr

    FILE* fp = fopen(LOGFILE, "a");
    // printf("%s%s\\n", stdBuffer, logBuffer);
    fprintf(fp, "%s%s\\n", stdBuffer, logBuffer);    // 向文件中写入日志信息
    fclose(fp);

👉线程安全的单例模式👈

什么是单例模式

单例模式是一种创建型设计模式,它保证一个类只有一个实例存在,并且提供一个全局访问点来访问该实例。饿汉方式和懒汉方式是两种常见的单例模式的实现方式。

单例模式的主要特点包括:

  • 只能有一个实例。
  • 全局访问点,方便访问该实例。

在很多服务器开发场景中,经常需要让服务器加载很多的数据 (上百G) 到内存中,此时往往要用一个单例的类来管理这些数据。

饿汉方式

在这种方式下,实例在类加载时就已经创建好了。这种实现方式存在一个明显的缺点,即如果实例一直没有被使用,那么空间被浪费了。不过也有不少的优点,就是简单且没有线程安全问题。饿汉方式实现的单例类在 main 函数执行之前就实例化好了,而 main 函数执行之前不存在多线程,也就不会存在线程安全问题了。

template <typename T>
class Singleton 

private:
	static T data;
public:
	static T* GetInstance() 
	
		return &data;
	
;

只要通过 Singleton 这个包装类来使用 T 对象, 则一个进程中只有一个 T 对象的实例。

懒汉方式

懒汉模式的核心思想就是延迟初始化,只有在第一次访问时才会创建实例。懒汉式单例在第一次使用时才会创建实例,可以避免空间浪费的问题,但是需要注意多线程安全问题。在多线程情况下,如果没有加锁,可能会导致创建多个实例。

template <typename T>
class Singleton 

private:
	static T* inst;
public:
	static T* GetInstance() 
	
		if (inst == nullptr) 
		
			inst = new T();
		 
		return inst;
	
;

懒汉方式存在线程安全问题:第一次调用 GetInstance 的时候, 如果两个线程同时调用, 可能会创建出两份 T 对象的实例,但是后续再次调用,就没有问题了。

线程安全的单例线程池

const int g_thread_num = 3;

template <class T>
class ThreadPool

private:
    ThreadPool(int num = g_thread_num)
        : _num(num)
    
        pthread_mutex_init(&_lock, nullptr);
        

LINUX多线程(线程池,单例模式,线程安全,读者写者模型)

线程池,单例模式,线程安全,读者写者模型

线程池

基本概念

一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。

使用场景

  1. 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
  2. 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
  3. 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误.

图解实例


左边的云朵状的我们称之为任务,当我们有需求给服务器传输过去时候,
没有线程池:

服务器回去创建线程。

有线程池:

直接调用线程来执行。
这样线程池的有点就体现李一点,就是省去了创建和销毁的时间。

正常情况下,如果一直去执行那么会浪费线程资源,于是在服务器和线程池中就存在一个任务队列,用户将任务给服务器,服务器给任务队列,线程池通过任务队列来拿数据进行处理,这样就不需要过多的浪费线程池资源。

价值体现

  1. 有任务,立马有线程进行服务,省掉了线程创建的时间。
  2. 有效防止,servers中线程过多,导致系统过载的问题。
  3. 线程池占用资源少,健壮性不强,进程池则相反。

模拟实现线程池

class Task
  public:
    int base;
  public:
    Task()
    Task(int _b):base(_b)

    void Run()
    
      std::cout <<"thread is[" << pthread_self() << "] task run ... done: base# "<< base << " pow is# "<< pow(base,2) << std::endl;
    
    ~Task()
    
;

class ThreadPool
  private:
    std::queue<Task*>q;
    int max_num;
    pthread_mutex_t lock;
    pthread_cond_t cond;
    bool quit;
  public:
    void LockQueue()
    
      pthread_mutex_lock(&lock);
    
    void UnlockQueue()
    
      pthread_mutex_unlock(&lock);
    
    bool IsEmpty()
    
      return q.size()==0;
    
    void ThreadWait()
    
      pthread_cond_wait(&cond,&lock);
    
    void ThreadWake()
    
      pthread_cond_signal(&cond);
    
  public:
    ThreadPool(int _max=NUM):max_num(_max)
    
    static void *Routine(void *arg)
    
      ThreadPool *this_p=(ThreadPool*)arg;
      while(true)
      
        this_p->LockQueue();
        while(true&&this_p->IsEmpty())
        
          this_p->ThreadWait();
        
        Task t;
        if(true&&!this_p->IsEmpty())
        
          this_p->Get(t);
        
        this_p->UnlockQueue();
        t.Run();
      
    
    void ThreadPoolInit()
    
      pthread_mutex_init(&lock, nullptr);
      pthread_cond_init(&cond, nullptr);
      pthread_t t;
      for(int i = 0; i < max_num; i++)
        pthread_create(&t, nullptr, Routine, this);                          
      
    
    void Put(Task&in)
    
      LockQueue();
      q.push(&in);
      UnlockQueue();
      ThreadWake();
    

    void Get(Task&out)
    
      Task* t=q.front();
      q.pop();
      out=*t;
    
    void ThreadQuit()
    
      if(!IsEmpty())
      
        std::cout<<"empty"<<std::endl;
        return;
      
      quit=true;
      ThreadWake();
    
    ~ThreadPool()
    
      pthread_mutex_destroy(&lock);
      pthread_cond_destroy(&cond);
    

;

线程安全的单例模式

基本概念

什么是单例模式

单例模式是一种 “经典的, 常用的, 常考的” 设计模式

什么是设计模式

IT行业这么火, 涌入的人很多. 俗话说林子大了啥鸟都有. 大佬和菜鸡们两极分化的越来越严重. 为了让菜鸡们不太拖大佬的后腿, 于是大佬们针对一些经典的常见的场景, 给定了一些对应的解决方案, 这个就是 设计模式

饿汉实现方式和懒汉实现方式

当我们进入游戏的时候前期的加载工作就是饿汉,因为不可能将所有需要的文件第一次载入,因为时间很长,所以后边就会实现懒汉的方式再你需要使用的时候,在进行加载。

饿汉方式实现单例模式

emplate <typename T>
class Singleton 
static T data;
public:
static T* GetInstance() 
return &data;

;

就是再创建的时候就已经实例化,下次使用直接使用。

懒汉方式实现单例模式

template <typename T>
class Singleton 
static T* inst;
public:
static T* GetInstance() 
if (inst == NULL) 
inst = new T();

return inst;

;

在需要使用的时候再实例化。

STL,智能指针和线程安全

不是.
原因是, STL 的设计初衷是将性能挖掘到极致, 而一旦涉及到加锁保证线程安全, 会对性能造成巨大的影响.

而且对于不同的容器, 加锁方式的不同, 性能可能也不同(例如hash表的锁表和锁桶).

因此 STL 默认不是线程安全. 如果需要在多线程环境下使用, 往往需要调用者自行保证线程安全.

智能指针是否是线程安全的

对于 unique_ptr, 由于只是在当前代码块范围内生效, 因此不涉及线程安全问题.

对于 shared_ptr, 多个对象需要共用一个引用计数变量, 所以会存在线程安全问题. 但是标准库实现的时候考虑到了这个问题, 基于原子操作(CAS)的方式保证 shared_ptr 能够高效, 原子的操作引用计数.

其他常见的各种锁

  • 悲观锁:在每次取数据时,总是担心数据会被其他线程修改,所以会在取数据前先加锁(读锁,写锁,行锁等),当其他线程想要访问数据时,被阻塞挂起。

  • 乐观锁:每次取数据时候,总是乐观的认为数据不会被其他线程修改,因此不上锁。但是在更新数据前,会判断其他数据在更新前有没有对数据进行修改。主要采用两种方式:版本号机制和CAS操作。

  • CAS操作:当需要更新数据时,判断当前内存值和之前取得的值是否相等。如果相等则用新值更新。若不等则失败,失败则重试,一般是一个自旋的过程,即不断重试。

读者写者问题

基本概念

在编写多线程的时候,有一种情况是十分常见的。那就是,有些公共数据修改的机会比较少。相比较改写,它们读的机会反而高的多。通常而言,在读的过程中,往往伴随着查找的操作,中间耗时很长。给这种代码段加锁,会极大地
降低我们程序的效率。那么有没有一种方法,可以专门处理这种多读少写的情况呢? 有,那就是读写锁。

读者写者分析

在这句话中包含着几层关系:
读者和读者之间:共享同一个资源
写者和写者之间:互斥
写者和读者之间:互斥,同步

和生产消费不一样的是,消费者会取走数据而读者不会

伪代码分析

读优先

pthread_rwlock_rdlock()
P(r)
rc=rc+1;
if(rc==1)

P(w);

V(r);
读取数据
P(r)
rc=rc-1;
if(rc==0)

V(w);

V(r);

这里先申请读锁,然后对齐进行P操作将读者+1,如果已经有读者了就不让写者进入了P(w),出去的时候释放V(r)。

读取数据的时候,先P操作申请,知道最后一个读者出去rc==0,将V(w),此时写者就能进入,然后V(r)操作。

以上是关于Linux线程池 | 线程安全的单例模式 | STL智能指针与线程安全 | 读者写者问题的主要内容,如果未能解决你的问题,请参考以下文章

线程池 线程安全的单例模式 饿汉方式 懒汉方式

Linux多线程_(线程池,读者写者,自旋锁)

java 实现线程安全的单例模式

怎么实现一个线程安全的单例模式

Linux多线程_(线程池,单例模式,读者写者问题,自旋锁)

线程安全的单例模式是否真的安全