通用线程池的设计和实现[C语言]

Posted 祁峰

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了通用线程池的设计和实现[C语言]相关的知识,希望对你有一定的参考价值。


1 适用场景

    首先,必须明确一点,线程池不是万能的,它有其特定的使用场景。使用线程池是为了减小线程本身的开销对应用性能所产生的影响,但是其前提是线程本身创建、销毁的开销和线程执行任务的开销相比是不可忽略的。如果线程本身创建、销毁的开销对应用程序的性能可以忽略不计,那么使用/不使用线程池对程序的性能并不会有太大的影响。因此,线程池通常适合以下几种场景:

    1)、单位时间内处理的任务频繁,且任务时间较短

    2)、对实时性要求较高。如果接收到任务之后再创建线程,可能无法满足实时性的要求,此时必须使用线程池。

    3)、必须经常面对高突发性事件。比如Web服务器。如果有足球转播,则服务器将产生巨大冲击,此时使用传统方法,则必须不停的大量创建、销毁线程。此时采用动态线程池可以避免这种情况的发生。


2 代码实现

注意事项:因非分离线程在异常退出时,操作系统无法及时回收其占用的内存空间,因此,使用分离线程。但请注意:不要使用pthread_detach()来使线程成为分离线程,而应该通过线程属性(pthread_attr_t)的参数来设置线程为分离线程。否则,当线程退出后,再调用pthread_kill()来判断线程是否还存在时,很可能出现段错误

2.1 头文件

#if !defined(__THREAD_POOL_H__)
#define __THREAD_POOL_H__

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <memory.h>
#include <pthread.h>
#include <sys/types.h>

// 布尔类型
typedef int bool;
#define false (0)
#define true  (1)

/* 线程任务链表 */
typedef struct _thread_worker_t

	void *(*process)(void *arg);  /* 线程处理的任务 */
	void *arg;                    /* 任务接口参数 */
	struct _thread_worker_t *next;/* 下一个节点 */
thread_worker_t;

/* 线程池对象 */
typedef struct

	pthread_mutex_t queue_lock;   /* 队列互斥锁 */
	pthread_cond_t queue_ready;   /* 队列条件锁 */

	thread_worker_t *head;        /* 任务队列头指针 */
	bool isdestroy;               /* 是否已销毁线程 */
	pthread_t *threadid;          /* 线程ID数组 —动态分配空间 */
	int reqnum;                   /* 请求创建的线程个数 */
	int num;                      /* 实际创建的线程个数 */
	int queue_size;               /* 工作队列当前大小 */
thread_pool_t;

/* 函数声明 */
extern int thread_pool_init(thread_pool_t **pool, int num);
extern int thread_pool_add_worker(thread_pool_t *pool, void *(*process)(void *arg), void *arg);
extern int thread_pool_keepalive(thread_pool_t *pool);
extern int thread_pool_destroy(thread_pool_t *pool);

#endif /*__THREAD_POOL_H__*/

2.2 函数实现

/*************************************************************
 **功  能:线程池的初始化
 **参  数:
 **    pool:线程池对象
 **    num :线程池中线程个数
 **返回值:0:成功 !0: 失败
 *************************************************************/
int thread_pool_init(thread_pool_t **pool, int num)

	int idx = 0;

    /* 为线程池分配空间 */
	*pool = (thread_pool_t*)calloc(1, sizeof(thread_pool_t));
	if(NULL == *pool) 
		return -1;
	

    /* 初始化线程池 */
	pthread_mutex_init(&((*pool)->queue_lock), NULL);
	pthread_cond_init(&((*pool)->queue_ready), NULL);
	(*pool)->head = NULL;
	(*pool)->reqnum = num;
	(*pool)->queue_size = 0;
	(*pool)->isdestroy = false;
	(*pool)->threadid = (pthread_t*)calloc(1, num*sizeof(pthread_t));
	if(NULL == (*pool)->threadid) 
		free(*pool);
		(*pool) = NULL;

		return -1;
	

        /* 依次创建线程 */
	for(idx=0; idx<num; idx++) 
		ret = thread_create_detach(*pool, idx);
		if(0 != ret) 
			return -1;
		
		(*pool)->num++;
	

	return 0;
/*************************************************************
 **功  能:将任务加入线程池处理队列
 **参  数:
 **    pool:线程池对象
 **    process:需处理的任务
 **    arg: process函数的参数
 **返回值:0:成功 !0: 失败
 *************************************************************/
int thread_pool_add_worker(thread_pool_t *pool, void *(*process)(void *arg), void *arg)

	thread_worker_t *worker=NULL, *member=NULL;
	
	worker = (thread_worker_t*)calloc(1, sizeof(thread_worker_t));
	if(NULL == worker) 
		return -1;
	

	worker->process = process;
	worker->arg = arg;
	worker->next = NULL;

	pthread_mutex_lock(&(pool->queue_lock));

	member = pool->head;
	if(NULL != member) 
		while(NULL != member->next) member = member->next;
		member->next = worker;
	
	else 
pool->head = worker;pool->queue_size++;pthread_mutex_unlock(&(pool->queue_lock));pthread_cond_signal(&(pool->queue_ready));return 0;

/******************************************************************************
 **函数名称: thread_pool_keepalive
 **功    能: 线程保活
 **输入参数: 
 **       pool: 线程池
 **输出参数: NONE
 **返    回: 0: success !0: failed
 **实现过程:
 **      1. 判断线程是否存在
 **      2. 不存在,说明线程死亡,需重新创建
 ******************************************************************************/
int thread_pool_keepalive(thread_pool_t *pool)

	int idx=0, ret=0;

 	for(idx=0; idx<pool->num; idx++) 
		ret = pthread_kill(pool->thread[idx], 0);
		if(ESRCH == ret) 
			ret = thread_create_detach(pool, idx);
			if(ret < 0) 
				return -1;
			
		
	

	return 0;
/*************************************************************
 **功  能:线程池的销毁
 **参  数:
 **    pool:线程池对象
 **返回值:0:成功 !0: 失败
 *************************************************************/
int thread_pool_destroy(thread_pool_t *pool)

	int idx = 0;
	thread_worker_t *member = NULL;

	if(false != pool->isdestroy) 
		return -1;
	

	pool->isdestroy = true;

	pthread_cond_broadcast(&(pool->queue_ready));
	for(idx=0; idx<pool->num; idx++) 
		ret = pthread_kill(pool->threadid[idx], 0);
		if(ESRCH == ret) 
			continue;
		
		else 
			idx--;
			sleep(1);
		
	

	free(pool->threadid);
	pool->threadid = NULL;

	while(NULL != pool->head) 
		member = pool->head;
		pool->head = member->next;
		free(member);
	

	pthread_mutex_destroy(&(pool->queue_lock));
	pthread_cond_destroy(&(pool->queue_ready));
	free(pool);
	
	return 0;

/******************************************************************************
 **函数名称: thread_create_detach
 **功    能: 创建分离线程
 **输入参数: 
 **       pool: 线程池
 **       idx: 线程索引号
 **输出参数: NONE
 **返    回: 0: success !0: failed
 ******************************************************************************/
static int thread_create_detach(thread_pool_t *pool, int idx)

	int ret = 0;
	pthread_attr_t attr;

	do 
		ret = pthread_attr_init(&attr);
		if(0 != ret) 
			return -1;
		
		
		ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
		if(0 != ret) 
			return -1;
		
		
		ret = pthread_create(&((*pool)->threadid[idx]), &attr, thread_routine, *pool);
		if(0 != ret) 
			pthread_attr_destroy(&attr);
			
			if(EINTR == errno) 
				continue;
			
			return -1;
		
		pthread_attr_destroy(&attr);
	while(0);

	return 0;
/*************************************************************
 **功  能:线程池各个线程入口函数
 **参  数:
 **    arg:线程池对象
 **返回值:0:成功 !0: 失败
 *************************************************************/
static void *thread_routine(void *arg)

	thread_worker_t *worker = NULL;
	thread_pool_t *pool = (thread_pool_t*)arg;

	while(1) 
		pthread_mutex_lock(&(pool->queue_lock));
		while((false == pool->isdestroy) && (0 == pool->queue_size)) 
			pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock));
		

		if(false != pool->isdestroy) 
			pthread_mutex_unlock(&(pool->queue_lock));
			pthread_exit(NULL);
		

		pool->queue_size--;
		worker = pool->head;
		pool->head = worker->next;
		pthread_mutex_unlock(&(pool->queue_lock);
		/* 执行队列中的任务 */
		(*(worker->process))(worker->arg);

		free(worker);
		worker = NULL;
	

3 函数调用

#define THREAD_MAX_NUM (32)
#define SLEEP	(10)

int myprocess(void *arg)

	fprintf(stdout, "[%s][%d] threadid:%d arg:%d", __FILE__, __LINE__, pthread_self(), *(int*)arg);
	return 0;


int main(void)

	int ret=0, idx=0;
	thread_pool_t *pool = NULL;
	int array[THREAD_MAX_NUM] = 0;

	ret = thread_pool_init(&pool, THREAD_MAX_NUM);
	if(ret < 0) 
		return -1;
	

	for(idx=0; idx<THREAD_MAX_NUM; idx++) 
		array[idx] = idx;
		thread_pool_add_worker(pool, myprocess, &array[idx]); /* 注意:地址各不相同 */
	

	thread_pool_destroy(pool);
	pool = NULL;
	return 0;
}

以上是关于通用线程池的设计和实现[C语言]的主要内容,如果未能解决你的问题,请参考以下文章

Linux下C/C++ 手写一个线程池-

线程池的实现设计

Java实现通用线程池

Linux/设计模式线程池的实现

Linux/设计模式线程池的实现

线程池的设计原理是什么?