C实现线程池

Posted 路之遥_其漫漫

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了C实现线程池相关的知识,希望对你有一定的参考价值。

第一部分为头文件

 1 #ifndef __THREADPOOL_H_
 2 #define __THREADPOOL_H_
 3 
 4 typedef struct threadpool_t threadpool_t;
 5 
 6 /**
 7  * @function threadpool_create
 8  * @descCreates a threadpool_t object.
 9  * @param thr_num  thread num
10  * @param max_thr_num  max thread size
11  * @param queue_max_size   size of the queue.
12  * @return a newly created thread pool or NULL
13  */
14 threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size);
15 
16 /**
17  * @function threadpool_add
18  * @desc add a new task in the queue of a thread pool
19  * @param pool     Thread pool to which add the task.
20  * @param function Pointer to the function that will perform the task.
21  * @param argument Argument to be passed to the function.
22  * @return 0 if all goes well,else -1
23  */
24 int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg);
25 
26 /**
27  * @function threadpool_destroy
28  * @desc Stops and destroys a thread pool.
29  * @param pool  Thread pool to destroy.
30  * @return 0 if destory success else -1
31  */
32 int threadpool_destroy(threadpool_t *pool);
33 
34 /**
35  * @desc get the thread num
36  * @pool pool threadpool
37  * @return # of the thread
38  */
39 int threadpool_all_threadnum(threadpool_t *pool);
40 
41 /**
42  * desc get the busy thread num
43  * @param pool threadpool
44  * return # of the busy thread
45  */
46 int threadpool_busy_threadnum(threadpool_t *pool);
47 
48 #endif

第二部分为自实现线程池代码(对libevent库进行一些精简,凸显逻辑)

  1 #include <stdlib.h>
  2 #include <pthread.h>
  3 #include <unistd.h>
  4 #include <assert.h>
  5 #include <stdio.h>
  6 #include <string.h>
  7 #include <signal.h>
  8 #include <errno.h>
  9 #include "threadpool.h"
 10 
 11 #define DEFAULT_TIME 10                 /*10s检测一次*/
 12 #define MIN_WAIT_TASK_NUM 10            /*如果queue_size > MIN_WAIT_TASK_NUM 添加新的线程到线程池*/ 
 13 #define DEFAULT_THREAD_VARY 10          /*每次创建和销毁线程的个数*/
 14 #define true 1
 15 #define false 0
 16 
 17 typedef struct {
 18     void *(*function)(void *);          /* 函数指针,回调函数 */
 19     void *arg;                          /* 上面函数的参数 */
 20 } threadpool_task_t;                    /* 各子线程任务结构体 */
 21 
 22 /* 描述线程池相关信息 */
 23 struct threadpool_t {
 24     pthread_mutex_t lock;               /* 用于锁住本结构体 */    
 25     pthread_mutex_t thread_counter;     /* 记录忙状态线程个数de琐 -- busy_thr_num */
 26 
 27     pthread_cond_t queue_not_full;      /* 当任务队列满时,添加任务的线程阻塞,等待此条件变量 */
 28     pthread_cond_t queue_not_empty;     /* 任务队列里不为空时,通知等待任务的线程 */
 29 
 30     pthread_t *threads;                 /* 存放线程池中每个线程的tid。数组 */
 31     pthread_t adjust_tid;               /* 存管理线程tid */
 32     threadpool_task_t *task_queue;      /* 任务队列(数组首地址) */
 33 
 34     int min_thr_num;                    /* 线程池最小线程数 */
 35     int max_thr_num;                    /* 线程池最大线程数 */
 36     int live_thr_num;                   /* 当前存活线程个数 */
 37     int busy_thr_num;                   /* 忙状态线程个数 */
 38     int wait_exit_thr_num;              /* 要销毁的线程个数 */
 39 
 40     int queue_front;                    /* task_queue队头下标 */
 41     int queue_rear;                     /* task_queue队尾下标 */
 42     int queue_size;                     /* task_queue队中实际任务数 */
 43     int queue_max_size;                 /* task_queue队列可容纳任务数上限 */
 44 
 45     int shutdown;                       /* 标志位,线程池使用状态,true或false */
 46 };
 47 
 48 void *threadpool_thread(void *threadpool);
 49 
 50 void *adjust_thread(void *threadpool);
 51 
 52 int is_thread_alive(pthread_t tid);
 53 int threadpool_free(threadpool_t *pool);
 54 
 55 //threadpool_create(3,100,100);  
 56 threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size)
 57 {
 58     int i;
 59     threadpool_t *pool = NULL;
 60     do {
 61         if((pool = (threadpool_t *)malloc(sizeof(threadpool_t))) == NULL) {  
 62             printf("malloc threadpool fail");
 63             break;                                      /*跳出do while*/
 64         }
 65 
 66         pool->min_thr_num = min_thr_num;
 67         pool->max_thr_num = max_thr_num;
 68         pool->busy_thr_num = 0;
 69         pool->live_thr_num = min_thr_num;               /* 活着的线程数 初值=最小线程数 */
 70         pool->wait_exit_thr_num = 0;
 71         pool->queue_size = 0;                           /* 有0个产品 */
 72         pool->queue_max_size = queue_max_size;
 73         pool->queue_front = 0;
 74         pool->queue_rear = 0;
 75         pool->shutdown = false;                         /* 不关闭线程池 */
 76 
 77         /* 根据最大线程上限数, 给工作线程数组开辟空间, 并清零 */
 78         pool->threads = (pthread_t *)malloc(sizeof(pthread_t)*max_thr_num); 
 79         if (pool->threads == NULL) {
 80             printf("malloc threads fail");
 81             break;
 82         }
 83         memset(pool->threads, 0, sizeof(pthread_t)*max_thr_num);
 84 
 85         /* 队列开辟空间 */
 86         pool->task_queue = (threadpool_task_t *)malloc(sizeof(threadpool_task_t)*queue_max_size);
 87         if (pool->task_queue == NULL) {
 88             printf("malloc task_queue fail");
 89             break;
 90         }
 91 
 92         /* 初始化互斥琐、条件变量 */
 93         if (pthread_mutex_init(&(pool->lock), NULL) != 0
 94                 || pthread_mutex_init(&(pool->thread_counter), NULL) != 0
 95                 || pthread_cond_init(&(pool->queue_not_empty), NULL) != 0
 96                 || pthread_cond_init(&(pool->queue_not_full), NULL) != 0)
 97         {
 98             printf("init the lock or cond fail");
 99             break;
100         }
101 
102         /* 启动 min_thr_num 个 work thread */
103         for (i = 0; i < min_thr_num; i++) {
104             pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);/*pool指向当前线程池*/
105             printf("start thread 0x%x...\n", (unsigned int)pool->threads[i]);
106         }
107         pthread_create(&(pool->adjust_tid), NULL, adjust_thread, (void *)pool);/* 启动管理者线程 */
108 
109         return pool;
110 
111     } while (0);
112 
113     threadpool_free(pool);      /* 前面代码调用失败时,释放poll存储空间 */
114 
115     return NULL;
116 }
117 
118 /* 向线程池中 添加一个任务 */
119 //threadpool_add(thp, process, (void*)&num[i]);   /* 向线程池中添加任务 process: 小写---->大写*/
120 
121 int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg)
122 {
123     pthread_mutex_lock(&(pool->lock));
124 
125     /* ==为真,队列已经满, 调wait阻塞 */
126     while ((pool->queue_size == pool->queue_max_size) && (!pool->shutdown)) {
127         pthread_cond_wait(&(pool->queue_not_full), &(pool->lock));
128     }
129     if (pool->shutdown) {
130         pthread_cond_broadcast(&(pool->queue_not_empty));
131         pthread_mutex_unlock(&(pool->lock));
132         return 0;
133     }
134 
135     /* 清空 工作线程 调用的回调函数 的参数arg */
136     if (pool->task_queue[pool->queue_rear].arg != NULL) {
137         pool->task_queue[pool->queue_rear].arg = NULL;
138     }
139     /*添加任务到任务队列里*/
140     pool->task_queue[pool->queue_rear].function = function;
141     pool->task_queue[pool->queue_rear].arg = arg;
142     pool->queue_rear = (pool->queue_rear + 1) % pool->queue_max_size;       /* 队尾指针移动, 模拟环形 */
143     pool->queue_size++;
144 
145     /*添加完任务后,队列不为空,唤醒线程池中 等待处理任务的线程*/
146     pthread_cond_signal(&(pool->queue_not_empty));
147     pthread_mutex_unlock(&(pool->lock));
148 
149     return 0;
150 }
151 
152 /* 线程池中各个工作线程 */
153 void *threadpool_thread(void *threadpool)
154 {
155     threadpool_t *pool = (threadpool_t *)threadpool;
156     threadpool_task_t task;
157 
158     while (true) {
159         /* Lock must be taken to wait on conditional variable */
160         /*刚创建出线程,等待任务队列里有任务,否则阻塞等待任务队列里有任务后再唤醒接收任务*/
161         pthread_mutex_lock(&(pool->lock));
162 
163         /*queue_size == 0 说明没有任务,调 wait 阻塞在条件变量上, 若有任务,跳过该while*/
164         while ((pool->queue_size == 0) && (!pool->shutdown)) {  
165             printf("thread 0x%x is waiting\n", (unsigned int)pthread_self());
166             pthread_cond_wait(&(pool->queue_not_empty), &(pool->lock));
167 
168             /*清除指定数目的空闲线程,如果要结束的线程个数大于0,结束线程*/
169             if (pool->wait_exit_thr_num > 0) {
170                 pool->wait_exit_thr_num--;
171 
172                 /*如果线程池里线程个数大于最小值时可以结束当前线程*/
173                 if (pool->live_thr_num > pool->min_thr_num) {
174                     printf("thread 0x%x is exiting\n", (unsigned int)pthread_self());
175                     pool->live_thr_num--;
176                     pthread_mutex_unlock(&(pool->lock));
177 
178                     pthread_exit(NULL);
179                 }
180             }
181         }
182 
183         /*如果指定了true,要关闭线程池里的每个线程,自行退出处理---销毁线程池*/
184         if (pool->shutdown) {
185             pthread_mutex_unlock(&(pool->lock));
186             printf("thread 0x%x is exiting\n", (unsigned int)pthread_self());
187             pthread_detach(pthread_self());
188             pthread_exit(NULL);     /* 线程自行结束 */
189         }
190 
191         /*从任务队列里获取任务, 是一个出队操作*/
192         task.function = pool->task_queue[pool->queue_front].function;
193         task.arg = pool->task_queue[pool->queue_front].arg;
194 
195         pool->queue_front = (pool->queue_front + 1) % pool->queue_max_size;       /* 出队,模拟环形队列 */
196         pool->queue_size--;
197 
198         /*通知可以有新的任务添加进来*/
199         pthread_cond_broadcast(&(pool->queue_not_full));
200 
201         /*任务取出后,立即将 线程池琐 释放*/
202         pthread_mutex_unlock(&(pool->lock));
203 
204         /*执行任务*/ 
205         printf("thread 0x%x start working\n", (unsigned int)pthread_self());
206         pthread_mutex_lock(&(pool->thread_counter));                            /*忙状态线程数变量琐*/
207         pool->busy_thr_num++;                                                   /*忙状态线程数+1*/
208         pthread_mutex_unlock(&(pool->thread_counter));
209 
210         (*(task.function))(task.arg);                                           /*执行回调函数任务*/
211         //task.function(task.arg);                                              /*执行回调函数任务*/
212 
213         /*任务结束处理*/ 
214         printf("thread 0x%x end working\n", (unsigned int)pthread_self());
215         pthread_mutex_lock(&(pool->thread_counter));
216         pool->busy_thr_num--;                                       /*处理掉一个任务,忙状态数线程数-1*/
217         pthread_mutex_unlock(&(pool->thread_counter));
218     }
219 
220     pthread_exit(NULL);
221 }
222 
223 /* 管理线程 */
224 void *adjust_thread(void *threadpool)
225 {
226     int i;
227     threadpool_t *pool = (threadpool_t *)threadpool;
228     while (!pool->shutdown) {
229 
230         sleep(DEFAULT_TIME);                                    /*定时 对线程池管理*/
231 
232         pthread_mutex_lock(&(pool->lock));
233         int queue_size = pool->queue_size;                      /* 关注 任务数 */
234         int live_thr_num = pool->live_thr_num;                  /* 存活 线程数 */
235         pthread_mutex_unlock(&(pool->lock));
236 
237         pthread_mutex_lock(&(pool->thread_counter));
238         int busy_thr_num = pool->busy_thr_num;                  /* 忙着的线程数 */
239         pthread_mutex_unlock(&(pool->thread_counter));
240 
241         /* 创建新线程 算法: 任务数大于最小线程池个数, 且存活的线程数少于最大线程个数时 如:30>=10 && 40<100*/
242         if (queue_size >= MIN_WAIT_TASK_NUM && live_thr_num < pool->max_thr_num) {
243             pthread_mutex_lock(&(pool->lock));  
244             int add = 0;
245 
246             /*一次增加 DEFAULT_THREAD 个线程*/
247             for (i = 0; i < pool->max_thr_num && add < DEFAULT_THREAD_VARY
248                     && pool->live_thr_num < pool->max_thr_num; i++) {
249                 if (pool->threads[i] == 0 || !is_thread_alive(pool->threads[i])) {
250                     pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);
251                     add++;
252                     pool->live_thr_num++;
253                 }
254             }
255 
256             pthread_mutex_unlock(&(pool->lock));
257         }
258 
259         /* 销毁多余的空闲线程 算法:忙线程X2 小于 存活的线程数 且 存活的线程数 大于 最小线程数时*/
260         if ((busy_thr_num * 2) < live_thr_num  &&  live_thr_num > pool->min_thr_num) {
261 
262             /* 一次销毁DEFAULT_THREAD个线程, 隨機10個即可 */
263             pthread_mutex_lock(&(pool->lock));
264             pool->wait_exit_thr_num = DEFAULT_THREAD_VARY;      /* 要销毁的线程数 设置为10 */
265             pthread_mutex_unlock(&(pool->lock));
266 
267             for (i = 0; i < DEFAULT_THREAD_VARY; i++) {
268                 /* 通知处在空闲状态的线程, 他们会自行终止*/
269                 pthread_cond_signal(&(pool->queue_not_empty));
270             }
271         }
272     }
273 
274     return NULL;
275 }
276 
277 int threadpool_destroy(threadpool_t *pool)
278 {
279     int i;
280     if (pool == NULL) {
281         return -1;
282     }
283     pool->shutdown = true;
284 
285     /*先销毁管理线程*/
286     pthread_join(pool->adjust_tid, NULL);
287 
288     for (i = 0; i < pool->live_thr_num; i++) {
289         /*通知所有的空闲线程*/
290         pthread_cond_broadcast(&(pool->queue_not_empty));
291     }
292     for (i = 0; i < pool->live_thr_num; i++) {
293         pthread_join(pool->threads[i], NULL);
294     }
295     threadpool_free(pool);
296 
297     return 0;
298 }
299 
300 int threadpool_free(threadpool_t *pool)
301 {
302     if (pool == NULL) {
303         return -1;
304     }
305 
306     if (pool->task_queue) {
307         free(pool->task_queue);
308     }
309     if (pool->threads) {
310         free(pool->threads);
311         pthread_mutex_lock(&(pool->lock));
312         pthread_mutex_destroy(&(pool->lock));
313         pthread_mutex_lock(&(pool->thread_counter));
314         pthread_mutex_destroy(&(pool->thread_counter));
315         pthread_cond_destroy(&(pool->queue_not_empty));
316         pthread_cond_destroy(&(pool->queue_not_full));
317     }
318     free(pool);
319     pool = NULL;
320 
321     return 0;
322 }
323 
324 int threadpool_all_threadnum(threadpool_t *pool)
325 {
326     int all_threadnum = -1;
327     pthread_mutex_lock(&(pool->lock));
328     all_threadnum = pool->live_thr_num;
329     pthread_mutex_unlock(&(pool->lock));
330     return all_threadnum;
331 }
332 
333 int threadpool_busy_threadnum(threadpool_t *pool)
334 {
335     int busy_threadnum = -1;
336     pthread_mutex_lock(&(pool->thread_counter));
337     busy_threadnum = pool->busy_thr_num;
338     pthread_mutex_unlock(&(pool->thread_counter));
339     return busy_threadnum;
340 }
341 
342 int is_thread_alive(pthread_t tid)
343 {
344     int kill_rc = pthread_kill(tid, 0);     //发0号信号,测试线程是否存活
345     if (kill_rc == ESRCH) {
346         return false;
347     }
348 
349     return true;
350 }
351 
352 /*测试*/ 
353 
354 #if 1
355 /* 线程池中的线程,模拟处理业务 */
356 void *process(void *arg)
357 {
358     printf("thread 0x%x working on task %d\n ",(unsigned int)pthread_self(),(int)arg);
359     sleep(1);  //小---大写
360     printf("task %d is end\n",(int)arg);
361 
362     return NULL;
363 }
364 
365 int main(void)
366 {
367     /*threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size);*/
368 
369     threadpool_t *thp = threadpool_create(3,100,100);   /*创建线程池,池里最小3个线程,最大100,队列最大100*/
370     printf("pool inited");
371 
372     //int *num = (int *)malloc(sizeof(int)*20);
373     int num[20], i;
374     for (i = 0; i < 20; i++) {
375         num[i]=i;
376         printf("add task %d\n",i);
377         threadpool_add(thp, process, (void*)&num[i]);   /* 向线程池中添加任务 */
378     }
379     sleep(10);                                          /* 等子线程完成任务 */
380     threadpool_destroy(thp);
381 
382     return 0;
383 }
384 
385 #endif

=======================

线程池的相关信息:

typedef struct {
void *(*function)(void *); /* 函数指针,回调函数 */
void *arg; /* 上面函数的参数 */
} threadpool_task_t; /* 各子线程任务结构体 */


/* 描述线程池相关信息 */
struct threadpool_t {
pthread_mutex_t lock; /* 用于锁住本结构体 */
pthread_mutex_t thread_counter; /* 记录忙状态线程个数de琐 -- busy_thr_num */

pthread_cond_t queue_not_full; /* 当任务队列满时,添加任务的线程阻塞,等待此条件变量 */
pthread_cond_t queue_not_empty; /* 任务队列里不为空时,通知等待任务的线程 */

pthread_t *threads; /* 存放线程池中每个线程的tid。数组 */
pthread_t adjust_tid; /* 存管理线程tid */
threadpool_task_t *task_queue; /* 任务队列(数组首地址) */

int min_thr_num; /* 线程池最小线程数 */
int max_thr_num; /* 线程池最大线程数 */
int live_thr_num; /* 当前存活线程个数 */
int busy_thr_num; /* 忙状态线程个数 */
int wait_exit_thr_num; /* 要销毁的线程个数 */

int queue_front; /* task_queue队头下标 */
int queue_rear; /* task_queue队尾下标 */
int queue_size; /* task_queue队中实际任务数 */
int queue_max_size; /* task_queue队列可容纳任务数上限 */

int shutdown; /* 标志位,线程池使用状态,true或false */
};

============================================

查看这段代码的步骤及代码的相关逻辑步骤


1. 大结构体, threadpool_task_t;结构体

2. main

threadpool_create 创建线程池。

for产出任务

threadpool_add 添加任务。

销毁线程池。

3. threadpool_create()

4. threadpool_thread()

跟踪到pthread_cond_wait();阻塞

5. threadpool_add()

跟踪到pthread_cond_signal(); 会到4步中 pthread_cond_wait()继续向后。

6. adjust_thread()

添加10个线程

移除10个线程 ---pthread_exit();

7. threadpool_destroy()

销毁线程池。 ---pthread_exit();

 

以上是关于C实现线程池的主要内容,如果未能解决你的问题,请参考以下文章

可扩/减容线程池C语言原理讲解及代码实现

可扩/减容线程池C语言原理讲解及代码实现

Motan在服务provider端用于处理request的线程池

C语言实现线程池

Linux C 实现一个简单的线程池

C语言-单例模式实现线程池