简单线程池的实现
Posted chengonghao
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了简单线程池的实现相关的知识,希望对你有一定的参考价值。
1. 什么是线程池
线程池是线程的集合,拥有若干个线程,线程池中的线程一般用于执行大量的且相对短暂的任务。如果一个任务执行的时间很长,那么就不适合放在线程池中处理,比如说一个任务的执行时间跟进程的生命周期是一致的,那么这个线程的处理就没有必要放到线程池中调度,用一个普通线程即可。
线程池中线程的个数太少的话会降低系统的并发量,太多的话又会增加系统的开销。一般而言,线程池中线程的个数与线程的类型有关系,线程的类型分为
1. 计算密集型任务;
2. I/O密集型任务。
计算密集型任务是占用CPU资源的,很少被外界的事件打断,CPU的个数是一定的,所以并发数是一定的,因此线程个数等于CPU的个数时是最高效的。
I/O密集型任务意味着执行期间可能会被I/O中断,也就是说这个线程会被挂起,这时的线程个数应该大于CPU的个数。
线程池的本质是生产者与消费者模型的应用。生产者线程向任务队列中添加任务,一旦队列有任务到来,如果线程池有空闲线程,就唤醒空闲线程来执行任务,如果没有空闲线程,并且线程数没有达到阈值(线程池中线程的最大值),就创建新线程来执行任务。
当任务增加的时候能够动态的增加线程池中线程的数量,直到达到一个阈值。这个阈值就是线程池中线程的最大值。
当任务执行完毕时,能够动态的销毁线程池中的线程池。
2.线程池的实现
我在 Ubuntu 系统下用C语言写的程序(传送门:github),这是一个非常简单的线程池实现,代码量约300行,仅仅说明线程池的工作原理,我会文章最后给出扩展线程池的思路,使之成为一个拥有C/S架构,socket通信的线程池。
目前而言,用到的知识点就两个:
1. pthread_mutex_t:互斥锁;
2. pthread_cond_t:条件变量;
互斥锁和条件变量要配合使用,我在另一篇博客(点击打开链接)里给出了使用方法,有兴趣的童鞋可以去看一下~
我们的小线程池共有五个文件:
1. condition.h:把互斥锁和条件变量组合在一起,形成一个条件结构体,condition.h就是这个结构体的声明;
2. condition.c:与condition.h对应,定义了操作条件结构体的函数;
3. threadpool.h:包含两个结构体,一个是线程控制块(TCB),另一个是线程池结构体,还有三个函数:初始化线程池、销毁线程池、向线程池中添加任务;
4. threadpool.c:threadpool.h的实现;
5. main.c:主函数;
除此之外,我使用autotool编译程序,因此还有两个脚本文件:
1. makefile.am:定义文件之间的依赖关系;
2. build.sh:编译脚本;
接下来让我们深入代码去理解线程池~
Condition.h,为了实现线程同步,普遍的做法是将互斥锁(pthread_mutex_t)和条件变量(pthread_cond_t)配合在一起使用,最好的做法就是让两者组合成一个结构体,pthread_mutex_t 和 pthread_cond_t 同属于 pthread.h 头文件:
/*******************************************************************
* Copyright(c) 2016 Chen Gonghao
* All rights reserved.
*
* chengonghao@yeah.net
******************************************************************/
#ifndef _CONDITION_H_
#define _CONDITION_H_
#include <pthread.h>
/* 将互斥锁和条件变量封装成一个结构体 */
typedef struct condition
{
pthread_mutex_t pmutex ;
pthread_cond_t pcond ;
} condition_t ;
/* 初始化结构体 */
int condition_init( condition_t* cond ) ;
/* 拿到结构体中的互斥锁 */
int condition_lock( condition_t* cond ) ;
/* 释放结构体中的互斥锁 */
int condition_unlock( condition_t* cond ) ;
/* 使消费者线程等待在条件变量上 */
int condition_wait( condition_t* cond ) ;
/* 使消费者线程等待在条件变量上,abstime:等待超时 */
int condition_timedwait( condition_t* cond, const struct timespec* abstime ) ;
/* 生产者线程通知等待在条件变量上的消费者线程 */
int condition_signal( condition_t* cond ) ;
/* 生产者线程向等待在条件变量上的消费者线程广播 */
int condition_broadcast( condition_t* cond ) ;
/* 销毁结构体 */
int condition_destroy( condition_t* cond ) ;
#endif
Condition.c,有8个针对条件结构体的函数,condition.c就是这8个函数的定义,很简单,都是转而调用pthread.h头文件中的库函数。
/*******************************************************************
* Copyright(c) 2016 Chen Gonghao
* All rights reserved.
*
* chengonghao@yeah.net
******************************************************************/
#include "condition.h"
int condition_init( condition_t* cond )
{
int status ;
if ( ( status = pthread_mutex_init( &cond->pmutex, NULL ) ) )
{
return status ;
}
if ( ( status = pthread_cond_init( &cond->pcond, NULL ) ) )
{
return status ;
}
return 0 ;
}
int condition_lock( condition_t* cond )
{
return pthread_mutex_lock( &cond->pmutex) ;
}
int condition_unlock( condition_t* cond )
{
return pthread_mutex_unlock( &cond->pmutex ) ;
}
int condition_wait( condition_t* cond )
{
return pthread_cond_wait( &cond->pcond, &cond->pmutex ) ;
}
int condition_timedwait( condition_t* cond, const struct timespec* abstime )
{
return pthread_cond_timedwait( &cond->pcond, &cond->pmutex, abstime ) ;
}
int condition_signal( condition_t* cond )
{
return pthread_cond_signal( &cond->pcond ) ;
}
int condition_broadcast( condition_t* cond )
{
return pthread_cond_broadcast( &cond->pcond ) ;
}
int condition_destroy( condition_t* cond )
{
int status ;
if ( ( status = pthread_mutex_destroy( &cond->pmutex ) ) )
{
return status ;
}
if ( ( status = pthread_cond_destroy( &cond->pcond ) ) )
{
return status ;
}
return 0 ;
}
Pthreadpool.h:包含两个结构体,线程控制块(TCB)task_t和线程池结构体threadpool,还声明了三个针对线程池的操作:初始化线程池、销毁线程池、向线程池中添加任务。
/*******************************************************************
* Copyright(c) 2016 Chen Gonghao
* All rights reserved.
*
* chengonghao@yeah.net
******************************************************************/
#ifndef _THREAD_POOL_H_
#define _THREAD_POOL_H_
#include "condition.h"
/* 线程控制块(task control block),以单向链表的形式组织 TCB */
typedef struct task
{
void *( *run ) ( void* arg ) ; // 线程的执行函数
void *arg ; // 执行函数的参数
struct task* next ; // 指向下一个 TCB
} task_t ;
/* 线程池结构体 */
typedef struct threadpool
{
condition_t ready ; // 条件变量结构体
task_t* first ; // TCB 链表的头指针
task_t* last ; // TCB 链表的尾指针
int counter ; // TCB 的总数量
int idle ; // 空闲 TCB 的个数
int max_threads ; // 最大线程数量
int quit ; // 线程池销毁标志
} threadpool_t ;
/* 初始化线程池 */
void threadpool_init( threadpool_t* pool, int threads ) ;
/* 向线程池中添加任务 */
void threadpool_add_task( threadpool_t* pool, void* ( *run )( void* arg ), void* arg ) ;
/* 销毁线程池 */
void threadpool_destroy( threadpool_t* pool ) ;
#endif
Threadpool.c,共四个函数,对线程池的操作。
/*******************************************************************
* Copyright(c) 2016 Chen Gonghao
* All rights reserved.
*
* chengonghao@yeah.net
******************************************************************/
#include "threadpool.h"
#include <errno.h>
#include <time.h>
/* 线程入口函数 */
void* thread_runtime( void* arg )
{
struct timespec abstime ;
int timeout ;
// 拿到线程池对象
threadpool_t* pool = ( threadpool_t* ) arg ;
while ( 1 )
{
timeout = 0 ;
/************************** 进入临界区 ***********************/
condition_lock( &pool->ready ) ;
// 空闲线程数加 1
++ pool->idle ;
// 如果线程链表为空,而且线程池处于运行状态,那么线程就该等待任务的到来
while ( pool->first == NULL && pool->quit == 0 )
{
printf( "thread 0x%x is waiting\\n", (int)pthread_self() ) ;
clock_gettime( CLOCK_REALTIME, &abstime ) ;
abstime.tv_sec += 2 ;
int status = condition_timedwait( &pool->ready, &abstime ) ;
if ( status == ETIMEDOUT )
{
printf( "thread 0x%x is wait timed out\\n", (int)pthread_self() ) ;
timeout = 1 ;
break ;
}
}
// 如果线程等待超时
if ( timeout && pool->first == NULL )
{
-- pool->counter ; // 那么线程数量减 1
condition_unlock( &pool->ready ) ; // 释放互斥锁
break ; // 跳出 while,注意,break 之后,线程入口函数执行完毕,线程将不复存在
}
// 线程获得任务
-- pool->idle ; // 线程池空闲数减 1
if ( pool->first != NULL )
{
task_t* t = pool->first ; // 从链表头部取出 TCB
pool->first = t->next ; // 指向下一个 TCB
// 执行任务需要一定时间,所以要先解锁
// 以便生产者可以往链表中加入任务
// 以及其他消费者可以等待任务
condition_unlock( &pool->ready ) ;
t->run( t->arg ) ; // 执行任务的回调函数
free( t ) ; // 任务执行完毕,销毁 TCB
condition_lock( &pool->ready ) ;
}
// quit == 1 说明要销毁线程池
if ( pool->quit && pool->first == NULL )
{
-- pool->counter ;
if ( pool->counter == 0 )
{
condition_signal( &pool->ready ) ; // 唤醒等待在条件变量上的主线程
}
condition_unlock( &pool->ready ) ;
break ;
}
condition_unlock( &pool->ready ) ;
/************************** 退出临界区 ***********************/
}
return NULL ;
}
/* 初始化线程池 */
void threadpool_init( threadpool_t* pool, int threads )
{
condition_init( &pool->ready ) ; // 初始化条件变量结构体
pool->first = NULL ; // 设置线程链表头指针
pool->last = NULL ; // 设置线程链表尾指针
pool->counter = 0 ; // 设置线程池当前线程数
pool->idle = 0 ; // 设置线程池当前空闲线程数
pool->max_threads = threads ; // 设置线程池最大线程数
pool->quit = 0 ; // quit = 0,线程池运行状态;quit = 1,线程池销毁状态
}
/* 向线程池中添加线程 */
void threadpool_add_task( threadpool_t* pool, void* (*run)( void* arg ), void* arg )
{
task_t* newtask = ( task_t* ) malloc ( sizeof( task_t ) ) ; // 创建线程控制块
newtask->run = run ; // 设置线程的回调函数
newtask->arg = arg ; // 设置回调函数的参数
newtask->next = NULL ; // 新加入的线程会被添加到链表尾部
/************************** 进入临界区 ***********************/
condition_lock( &pool->ready ) ; // 拿到互斥锁
// 把新创建的 TCB 添加到线程链表中
if ( pool->first == NULL )
{
// 如果线程链表为空,则 TCB 作为链表头部
pool->first = newtask ;
}
else
{
// 如果线程链表不为空,加入到链表尾部
pool->last->next = newtask ;
}
pool->last = newtask ; // 修改链表尾指针
// 如果有空闲线程,那么就唤醒空闲线程
if ( pool->idle > 0 )
{
condition_signal( &pool->ready ) ; // 通知等待在条件变量上的空闲线程
}
else if ( pool->counter < pool->max_threads )
{
// 如果没有空闲线程可用,而且当前线程数量小于线程池的容量,我们就创建一个线程
pthread_t tid ;
pthread_create( &tid, NULL, thread_runtime, pool ) ; // 指定新线程的起始函数为 thread_runtime,把线程池传递给 thread_runtime
++ pool->counter ;
}
condition_unlock( &pool->ready ) ; // 释放互斥锁
/************************** 退出临界区 ***********************/
}
/* 销毁线程池 */
void threadpool_destroy( threadpool_t* pool )
{
if ( pool->quit )
{
return ;
}
/************************** 进入临界区 ***********************/
condition_lock( &pool->ready ) ;
// 设置退出标志为真
pool->quit = 1 ;
// 如果线程池中正在运行着线程,那么我们需要等待线程执行完毕再销毁
if ( pool->counter > 0 )
{
if ( pool->idle > 0 )
{
condition_broadcast( &pool->ready ) ;
}
while ( pool->counter > 0 )
{
condition_wait( &pool->ready ) ; // 主线程(main 函数所在线程)将等待在条件变量上
}
}
condition_unlock( &pool->ready ) ;
/************************** 退出临界区 ***********************/
// 销毁条件变量
condition_destroy( &pool->ready ) ;
}
最后是主函数啦,main.c
/*******************************************************************
* Copyright(c) 2016 Chen Gonghao
* All rights reserved.
*
* chengonghao@yeah.net
******************************************************************/
#include "threadpool.h"
/* 定义线程池最大线程数 */
#define MAX_POOL_SIZE 3
/* 每个任务的回调函数 */
void* mytask( void* arg )
{
printf( " thread 0x%x is working on task %d\\n", (int)pthread_self(), *(int*)arg ) ;
sleep( 1 ) ;
free( arg ) ;
return NULL ;
}
int main( void )
{
threadpool_t pool ; // 定义一个线程池变量
threadpool_init( &pool, MAX_POOL_SIZE ) ; // 初始化线程池
// 向线程池中添加 10 个任务,每个任务的处理函数都是 mytask
for ( int i = 0; i < 10; ++ i )
{
int* arg = (int*)malloc( sizeof( int ) ) ;
*arg = i ;
threadpool_add_task( &pool, mytask, arg ) ;
}
threadpool_destroy( &pool ) ; // 销毁线程池
return 0 ;
}
画个图来理解线程池的原理:
可以发现生产者的代码主要位于 thread_add_task()函数,消费者的代码主要位于 thread_runtime() 函数。
细心的童鞋也许发现了:thread_runtime()中只有while循环,三个消费者同时执行while 循环,不可能保证循环有序的从生产线上取任务,任务来临时如何保证三个消费者不发生争抢行为呢?这就是条件变量的作用了,当消费者发现生产线为空时,就依次睡在条件变量上,睡在条件变量上的关键代码如下:
int status = condition_timedwait( &pool->ready, &abstime ) ;
如果睡超时了就要自动销毁喔~
当任务来临时,生产者有义务通知睡眠的消费者,唤醒后者,起来嗨~,关键代码如下:
condition_signal(&pool->ready ) ; // 唤醒等待在条件变量上的主线程
执行结果如下:
我设置的线程池容量为 2,任务数是 10,因此有 2 个线程处理 10 个任务。
从上图可以发现,两个线程分别是0xaa1de700 和 0xaa9df700,这两个线程依次处理 0 ~ 9 号任务,没有任务处理时,便进入等待状态(is waiting),等待超时的结果就是自动销毁,然后主线程退出,程序结束。
3.扩展线程池
以上就是一个最最小的线程池实现了,这只是一个玩具,仅供了解线程池的实现。但是我们可以把小玩具扩展成一个有模有样的线程池:C/S架构、socket 通信,以下是扩展思路,所有扩展的代码都在:simple_thread_pool/extension/。
客户端:
1. 把 socket 封个包,也就是把原生 socket 封装成一个类,我已经做了这一步,代码在:simple_thread_pool/extension/ossSocket/;
2. 做一个命令工厂类(CommandFactory),根据客户端的输入,生产出一系列的命令对象;
3. 客户端类(Client),其中包含 socket 类和 CommandFactory;
服务器端:
服务器端基本上就是对本文介绍的小线程池的扩展。
1.在main函数中添加监听函数:TCPListener();
2.TCPListener()的代码主要由 threadpool_add_task() 和 threadpool_runtime()的代码构成;
很多基础软件,比如数据库、服务器,底层都跑着一个线程池,要是有时间的话,我会扩展这个小线程池,然后写博客介绍技术实现。
以上是关于简单线程池的实现的主要内容,如果未能解决你的问题,请参考以下文章