Linux/设计模式线程池的实现
Posted zhaocx111222333
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Linux/设计模式线程池的实现相关的知识,希望对你有一定的参考价值。
线程池是一种非常典型的生产消费者模型。
我们在利用线程处理任务的时候,线程过多会带来调度开销,有大量的时间用去创建和处理线程,进而影响缓存局部性和整体性能。
所以就有了线程池,它维护着多个线程,等待着监督管理者分配可并发执行的任务。注意要将人物数据和处理方法一起传入进来,避免线程池和数据处理耦合,增大线程池开销。
线程池在网络服务器访问等地方是非常适合的。
整个编写的思路和注意事项都写在注释中了
任务类
里面需要包含数据和数据处理方式
1 #include <cstdio>
2 #include <iostream>
3 #include <queue>
4 #include <unistd.h>
5 #include <pthread.h>
6 using namespace std;
7 #define MAX_QUEUE 10
8 #define MAX_THREAD 5
9
10 typedef void (*handler_t)(int data);
11 //定义一个任务类,里面存放数据和数据的处理方法,
12 //到时候将任务传入线程池的任务队列,线程池的线程自动去执行类的处理方法run
13 class ThreadTask{
14 private:
15 int _data;//要处理的数据
16 handler_t _handler;//处理方式的接口
17 public:
18 ThreadTask() {}//默认构造
19 ThreadTask(int data, handler_t handler)//通过构造函数传入数据和函数
20 :_data(data)
21 , _handler(handler){}
22 void SetTask(int data, handler_t handler) {//通过函数传入数据和函数
23 _data = data;
24 _handler = handler;
25 }
26 void Run() {//线程池需要执行的函数
27 return _handler(_data);
28 }
29 };
线程安全的结构
通过条件变量和信号量实现线程安全的任务队列
要注意里面的区别:
条件变量获取资源条件判断需要自己写,并且需要搭配互斥锁
信号量则不需要
30 //线程安全的任务队列
31 //1.利用条件变量和互斥锁以及任务队列实现
32 //此时的获取资源条件判断需要自己写_queue.size() == _capacity _queue.empty()
33 class BlockQueue{
34 private:
35 int _capacity;//容量
36 std::queue<ThreadTask> _queue;
37 pthread_mutex_t _mutex;
38 pthread_cond_t _cond_pro;
39 pthread_cond_t _cond_cus;
40 public:
41 BlockQueue(int cap = MAX_QUEUE):_capacity(cap) {
42 pthread_mutex_init(&_mutex, NULL);
43 pthread_cond_init(&_cond_pro, NULL);
44 pthread_cond_init(&_cond_cus, NULL);
45 }
46 ~BlockQueue() {
47 pthread_mutex_destroy(&_mutex);
48 pthread_cond_destroy(&_cond_pro);
49 pthread_cond_destroy(&_cond_cus);
50 }
51 bool Push(ThreadTask &data) {//数据类型为任务类
52 pthread_mutex_lock(&_mutex);
W> 53 while (_queue.size() == _capacity) {
54 pthread_cond_wait(&_cond_pro, &_mutex);
55 }
56 _queue.push(data);
57 pthread_cond_signal(&_cond_cus);
58 pthread_mutex_unlock(&_mutex);
59 return true;
60 }
61 bool Pop(ThreadTask *data) {
62 pthread_mutex_lock(&_mutex);
63 while (_queue.empty()) {//若没有任务线程阻塞
64 pthread_cond_wait(&_cond_cus, &_mutex);
65 }
66 *data = _queue.front();
67 _queue.pop();
68 pthread_cond_signal(&_cond_pro);
69 pthread_mutex_unlock(&_mutex);
70 return true;
71 }
72 };
73 //2.利用信号量实现
74 //此时的条件判断自身计数,并且也不需要互斥锁
75
76 /*class RingQueue{
77 private:
78 vector<int> arr;
79 int _start;
80 int _end;
81 int _capacity;
82 sem_t _lock;
83 sem_t _data;
84 sem_t _idle;
85
86 public:
87 RingQueue(int cap = MAX_CAP)
88 :_start(0)
89 , _end(0)
90 , _capacity(cap)
91 {
92 sem_init(&_lock, 0, 1);
93 sem_init(&_data, 0, 0);
94 sem_init(&_idle, 0, cap);
95 }
96
97 ~RingQueue(){
98 sem_destroy(&_lock);
99 sem_destroy(&_idle);
100 sem_destroy(&_data);
101 }
102
103 void push(int data){
104 sem_wait(&_idle);
105 sem_wait(&_lock);
106 arr[_start++] = data;
107 _start = _capacity%_start;
108 sem_post(&_lock);
109 sem_post(&_data);
110 }
111 void pop(int* data){
112 sem_wait(&_data);
113 sem_wait(&_lock);
114 arr[_end++] = *data;
115 _start = _capacity%_start;
116 sem_post(&_lock);
117 sem_post(&_idle);
118 }
119 };*/
线程池类
由很多线程和一个线程安全的任务队列组成
123 class ThreadPool
124 {
125 public:
126 ThreadPool(int tnum = MAX_THREAD, //默认构造--在创建线程池时需要给出最大线程数和最大任务数(参数)--目的是防止资源被耗尽
127 int qnum = MAX_QUEUE)
128 :_thread_num(tnum)//初始化列表(利用传递的任务数量和线程数量确定线程池)
W>129 , _queue(qnum)
130 , _queue_num(qnum){}
131 bool init(){
132 //初始化以及创建线程
133 int ret;
134 pthread_t tid;
135 for (int i = 0; i < _thread_num; i++) {
136 ret = pthread_create(&tid, NULL,
137 thr_entry, (void*)this);//这里传的是this指针
138 if (ret != 0) {
139 std::cout << "thread create error\\n";
140 return(-1);//为了让其有差错功能,就单独写一个创建
141 }
142 pthread_detach(tid);
143 }
W>144 }
145
146
147 bool Push(ThreadTask &task){//线程池对外不需要pop接口,只需要任务类进入线程池的任务队列
148 //在static void *thr_entry(void *arg);里就将线程出队了,以为内线程池本身就是消费者
149 _queue.Push(task);
150 return true;
151 }
152 private:
153 //线程入口函数,注意是静态成员函数,因为成员函数默认有this指针
154 //而线程的入口函数必须是指定的void*,只有一个参数
155 //所已将其设置为static,为了知道是谁调用了线程,所以传递this指针
156
157 static void *thr_entry(void *arg) {//线程入口函数
158 ThreadPool *p = (ThreadPool*)arg;
159 while(1) {
160 ThreadTask task;
161 p->_queue.Pop(&task);//线程将要处理的任务出队
162 //以为是线程安全的队列,若没有任务则阻塞
163 task.Run();//线程执行任务
164 }
165 return NULL;
166 }
167 private:
168 int _thread_num;
169 int _queue_num;
170 BlockQueue _queue;
171 };
测试
173 void test(int data)
174 {
175 printf("i am thr:%p-get data:%d, will sleep:%d s\\n",
W>176 pthread_self(), data, data % 5);
177 sleep(data % 5);
178 }
179 int main()
180 {
181 ThreadPool pool;//定义并且初始化一个线程池
182 pool.init();
183 for (int i = 0; i < 10; i++) {
184 ThreadTask task(i, test);//定义10个任务类
185 pool.Push(task);//任务类传入线程池的队列
186 //此时线程池里的安全队列wait由于有数据传入,解除阻塞完成线程任务
187 }
188 while (1) sleep(1);//避免让一个线程处理完全部,好观察
189 return 0;
190 }
以上是关于Linux/设计模式线程池的实现的主要内容,如果未能解决你的问题,请参考以下文章