Linux/设计模式线程池的实现

Posted zhaocx111222333

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Linux/设计模式线程池的实现相关的知识,希望对你有一定的参考价值。


线程池是一种非常典型的生产消费者模型。
我们在利用线程处理任务的时候,线程过多会带来调度开销,有大量的时间用去创建和处理线程,进而影响缓存局部性和整体性能。
所以就有了线程池,它维护着多个线程,等待着监督管理者分配可并发执行的任务。注意要将人物数据和处理方法一起传入进来,避免线程池和数据处理耦合,增大线程池开销。
线程池在网络服务器访问等地方是非常适合的。

整个编写的思路和注意事项都写在注释中了

任务类

里面需要包含数据和数据处理方式

    1 #include <cstdio>                                                                                                                                                                    
    2 #include <iostream>
    3 #include <queue>
    4 #include <unistd.h>
    5 #include <pthread.h>
    6 using namespace std;
    7 #define MAX_QUEUE   10
    8 #define MAX_THREAD  5
    9 
   10 typedef void (*handler_t)(int data);
   11 //定义一个任务类,里面存放数据和数据的处理方法,
   12 //到时候将任务传入线程池的任务队列,线程池的线程自动去执行类的处理方法run
   13 class ThreadTask{
   14     private:
   15         int _data;//要处理的数据
   16         handler_t _handler;//处理方式的接口
   17     public:
   18         ThreadTask() {}//默认构造
   19         ThreadTask(int data, handler_t handler)//通过构造函数传入数据和函数
   20       :_data(data)
   21       , _handler(handler){}
   22         void SetTask(int data, handler_t handler) {//通过函数传入数据和函数
   23             _data = data;
   24             _handler = handler;
   25         }
   26         void Run() {//线程池需要执行的函数
   27             return _handler(_data);
   28         }
   29 };

线程安全的结构

通过条件变量和信号量实现线程安全的任务队列
要注意里面的区别:
条件变量获取资源条件判断需要自己写,并且需要搭配互斥锁
信号量则不需要

   30 //线程安全的任务队列
   31 //1.利用条件变量和互斥锁以及任务队列实现
   32 //此时的获取资源条件判断需要自己写_queue.size() == _capacity  _queue.empty()
   33 class BlockQueue{
   34     private:
   35         int _capacity;//容量
   36         std::queue<ThreadTask> _queue;
   37         pthread_mutex_t _mutex;
   38         pthread_cond_t _cond_pro;
   39         pthread_cond_t _cond_cus;
   40     public:
   41         BlockQueue(int cap = MAX_QUEUE):_capacity(cap) {
   42             pthread_mutex_init(&_mutex, NULL);
   43             pthread_cond_init(&_cond_pro, NULL);
   44             pthread_cond_init(&_cond_cus, NULL);
   45         }
   46         ~BlockQueue() {
   47             pthread_mutex_destroy(&_mutex);
   48             pthread_cond_destroy(&_cond_pro);
   49             pthread_cond_destroy(&_cond_cus);
   50         }
   51         bool Push(ThreadTask &data) {//数据类型为任务类
   52             pthread_mutex_lock(&_mutex);
W> 53             while (_queue.size() == _capacity) {
   54                 pthread_cond_wait(&_cond_pro, &_mutex);
   55             }
   56             _queue.push(data);
   57             pthread_cond_signal(&_cond_cus);
   58             pthread_mutex_unlock(&_mutex);
   59             return true;
   60         }
   61         bool Pop(ThreadTask *data) {
   62             pthread_mutex_lock(&_mutex);
   63             while (_queue.empty()) {//若没有任务线程阻塞
   64                 pthread_cond_wait(&_cond_cus, &_mutex);
   65             }
   66             *data = _queue.front();
   67             _queue.pop();
   68             pthread_cond_signal(&_cond_pro);
   69             pthread_mutex_unlock(&_mutex);
   70             return true;
   71         }
   72 };
   73 //2.利用信号量实现
   74 //此时的条件判断自身计数,并且也不需要互斥锁
   75 
   76 /*class RingQueue{
   77 private:
   78   vector<int> arr;
   79   int _start;
   80   int _end;
   81   int _capacity;
   82   sem_t _lock;
   83   sem_t _data;
   84   sem_t _idle;
   85 
   86 public:
   87   RingQueue(int cap = MAX_CAP)
   88     :_start(0)
   89     , _end(0)
   90     , _capacity(cap)
   91   {
   92     sem_init(&_lock, 0, 1);
   93     sem_init(&_data, 0, 0);
   94     sem_init(&_idle, 0, cap);
   95   }
   96 
   97   ~RingQueue(){
   98     sem_destroy(&_lock);
   99     sem_destroy(&_idle);
  100     sem_destroy(&_data);
  101   }
  102 
  103   void push(int data){
  104     sem_wait(&_idle);
  105     sem_wait(&_lock);
  106     arr[_start++] = data;
  107     _start = _capacity%_start;
  108     sem_post(&_lock);
  109     sem_post(&_data);
  110   }
  111   void pop(int* data){
  112     sem_wait(&_data);
  113     sem_wait(&_lock);
  114     arr[_end++] = *data;
  115     _start = _capacity%_start;
  116     sem_post(&_lock);
  117     sem_post(&_idle);
  118   }
  119 };*/

线程池类

由很多线程和一个线程安全的任务队列组成

123 class ThreadPool
  124 {                                                                                                                                                                                                                                                                                  
  125     public:
  126     ThreadPool(int tnum = MAX_THREAD, //默认构造--在创建线程池时需要给出最大线程数和最大任务数(参数)--目的是防止资源被耗尽
  127       int qnum = MAX_QUEUE)
  128       :_thread_num(tnum)//初始化列表(利用传递的任务数量和线程数量确定线程池)
W>129       , _queue(qnum)
  130       , _queue_num(qnum){}
  131   bool init(){
  132       //初始化以及创建线程
  133       int ret;
  134       pthread_t tid;
  135       for (int i = 0; i < _thread_num; i++) {
  136         ret = pthread_create(&tid, NULL,
  137           thr_entry, (void*)this);//这里传的是this指针
  138         if (ret != 0) {
  139           std::cout << "thread create error\\n";
  140           return(-1);//为了让其有差错功能,就单独写一个创建
  141         }
  142         pthread_detach(tid);
  143       }
W>144     }
  145             
  146         
  147         bool Push(ThreadTask &task){//线程池对外不需要pop接口,只需要任务类进入线程池的任务队列
  148           //在static void *thr_entry(void *arg);里就将线程出队了,以为内线程池本身就是消费者
  149             _queue.Push(task);
  150             return true;
  151         }
  152     private:
  153     //线程入口函数,注意是静态成员函数,因为成员函数默认有this指针
  154     //而线程的入口函数必须是指定的void*,只有一个参数
  155     //所已将其设置为static,为了知道是谁调用了线程,所以传递this指针
  156 
  157         static void *thr_entry(void *arg) {//线程入口函数
  158             ThreadPool *p = (ThreadPool*)arg;
  159             while(1) {
  160                 ThreadTask task;
  161                 p->_queue.Pop(&task);//线程将要处理的任务出队
  162         //以为是线程安全的队列,若没有任务则阻塞
  163                 task.Run();//线程执行任务
  164             }
  165             return NULL;
  166         }
  167     private:
  168         int _thread_num;
  169     int _queue_num;
  170         BlockQueue _queue;
  171 };

测试

  173 void test(int data)
  174 {
  175   printf("i am thr:%p-get data:%d, will sleep:%d s\\n",
W>176     pthread_self(), data, data % 5);
  177   sleep(data % 5);
  178 }
  179 int main()
  180 {
  181   ThreadPool pool;//定义并且初始化一个线程池
  182   pool.init();
  183   for (int i = 0; i < 10; i++) {
  184     ThreadTask task(i, test);//定义10个任务类
  185     pool.Push(task);//任务类传入线程池的队列
  186     //此时线程池里的安全队列wait由于有数据传入,解除阻塞完成线程任务
  187   }
  188   while (1) sleep(1);//避免让一个线程处理完全部,好观察
  189   return 0;
  190 }  

在这里插入图片描述

以上是关于Linux/设计模式线程池的实现的主要内容,如果未能解决你的问题,请参考以下文章

linux系统线程池

Linux线程池

Linux线程池的实现

线程池的实现设计

Linux下简单线程池的实现

线程池的毒丸方法实现线程池的配比热切换