一个简单线程池的实现

Posted 清水寺扫地僧

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了一个简单线程池的实现相关的知识,希望对你有一定的参考价值。



线程池使用场景

线程池拥有若干个线程,用于执行大量的相对短暂的任务

  • 计算密集型任务:线程个数=CPU个数,占用CPU的任务,特点是很少被外部中断。线程个数远多于CPU个数并不合适,因为CPU个数一定,即并发数一定,那么是较少的CPU调度多个线程,造成线程的上下文切换开销较大,所以会降低效率;
  • I/O密集型任务:线程个数>CPU个数


简单线程池的目标

  • 用于执行大量相对短暂的任务;
  • 当任务增加的时候能够动态地增加线程池中线程的数量直到达到一个阈值;
  • 当任务执行完毕的时候,能够动态地销毁线程池中的线程;
  • 该线程池的实现本质也是生产者与消费者模型的应用。生产者线程Dispatcher(生产者)任务队列(产品缓存区) 中添加任务,一旦队列有任务到来(产品缓存区非空),如果有 等待线程(消费者) 就唤醒来执行任务,如果没有等待线程并并且线程数没有达到阈值,就创建新线程来执行任务;


线程池实现


threadpoll.h

//threadpoll.h
#ifndef _THREAD_POLL_H_
#definde _THREAD_POLL_H_

#include "condition.h"

//任务结构体,将任务放入队列由线程池中的线程来执行
typedef struct task
{
	void *(*run)(void *arg); //任务回调函数
	void *arg;				 //回调函数参数
	struct task *next;
} task_t;

//线程池结构体
typedef struct threadpoll
{
	condition_t ready;		 //任务准备就绪或者线程池销毁通知
	task_t *first;			 //任务队列头指针
	tast_t *last;            //任务队列尾指针
	int counter;			 //线程池中当前线程数
	int idle;			     //线程池中当前正在等待任务的线程数
	int max_threads;         //线程池中最大允许的线程数
	int quit;                //销毁线程池的时候置1
} threadpool_t;

//初始化线程池
void threadpool_init(threadpoll_t *pool, int threads);
//往线程池中添加任务
void threadpoll_add_task(threadpool_t *pool, void *(*run)(void *arg), void *arg);
//销毁线程池
void threadpoll_destory(threadpoll_t *poll);

#endif /*_THREAD_POOL_H_*/

condition.h

//condition.h
#ifndef _CONDITION_H_
#define _CONDITION_H_
#include <pthread.h>

typedef struct condition 
{
	pthread_mutex_t pmutex;
	pthread_cond_t  pcond;
} condition_t;

int condition_init(condition_t *cond);
int condition_lock(condition_t *cond);
int condition_unlock(condition_t *cond);
int condition_wait(condition_t *cond);
int condition_timedwait(condition_t *cond, const struct timespec *abstime);
int condition_signal(condition_t *cond);
int condition_broadcast(condition_t *cond);
int condition_destory(condition_t *cond);

#endif /*_CONDITION_H_*/

threadpoll.c

#include <string.h>
#include <errno.h>
#include <time.h>
#include "threadpoll.h"

void *thread_routine(void *arg) {
	struct timespec abstime;
	int timeout;
	printf("Thread 0x%x is starting\\n", (int)pthread_self());
	threadpool_t *pool = (threadpool_t*)arg;
	//循环并等待任务队列当中任务不为空的条件变量信号,对任务队列中的任务进行执行处理
	//并按照实际情况,若是等待任务超时,或是收到线程池销毁条件,则线程有秩序退出
	while(1) {
		timeout = 0;
		condition_lock(&pool->ready);
		//当前线程没有处理任务,所以正在等待任务的线程数加一
		pool->idle++;
		//等待队列中有任务到来或者线程池销毁通知
		while(pool->first == NULL && !pool->quit)  {
			printf("Thread 0x%x is waiting\\n", (int)pthread_self());
			//condition_wait(&pool->ready); //没有超时线程退出的版本
			/************添加超时线程退出的版本***************/
			clock_gettime(CLOCK_REALTIME, &abstime);
			abstime.tv_sec += 2; //等待2s若是超时则退出
			int status = condition_timedwait(&pool->ready, &abstime);
			if(status == ETIMEDOUT) {
				printf("Thread 0x%x is waited timed out\\n", (int)pthread_self());
				timeout = 1;
				break;
			}
			/**********************************************/
		}
		//要处理任务了,所以等待任务队列要减一
		pool->idle--;

		//接收到任务需要处理
		if(pool->first != NULL) {
			//从队头取出任务
			task_t *t = pool->first;
			pool->first = t->next;
			//线程运行到这里仍然未解锁所占有的线程池资源
			//而执行任务t可能需要花费很长时间,为了让其它等待线程获取任务
			//让生产者进程添加任务,因此需要先对线程池进行解锁
			condition_lock(&pool->ready);
			t->run(t->arg);
			free(t);
			//任务执行完后再进行加锁
			condition_lock(&pool->ready);
		} 

		//接收到线程池销毁消息,且没有需要执行的任务
		if(pool->quit && pool->first == NULL){
			pool->counter--; //线程池中线程数减一
			if(pool->counter == 0) //说明线程池中线程都已经退出,此时唤醒destory
				condition_signal(&pool->ready);
			
			//跳出循环之前要记得解锁
			condition_unlock(&pool->ready);
			break;
		}

		//等待超时,且任务执行完则线程退出
		if(timeout && pool->first == NULL) {
			pool->counter--; //线程池中线程数减一
			if(pool->counter == 0) //说明线程池中线程都已经退出,此时唤醒destory
				condition_signal(&pool->ready);
			//跳出循环之前要记得解锁
			condition_unlock(&pool->ready);
			break;
		}
		condition_unlock(&pool->ready);
	}
	printf("Thread 0x%x is exiting\\n", (int)pthread_self());
	return NULL;
}

//初始化线程池
void threadpool_init(threadpoll_t *pool, int threads) {
	condition_init(&pool->ready);
	pool->first = NULL;
	pool->last = NULL;
	pool->counter = 0;
	pool->idle = 0;
	pool->max_threads = threads;
	pool->quit = 0;
}

//往线程池中添加任务
void threadpoll_add_task(threadpool_t *pool, void *(*run)(void *arg), void *arg) {
	//生成新任务
	tast_t *newtask = (tast_t*)malloc(sizeof(tast_t));
	newtask->run = run;
	newtask->arg = arg;
	newtask->next = NULL;

	//整个线程池是一个共享资源,对其修改的访问应当进行保护
	condition_lock(&pool->ready);
	//将任务添加到队列
	if(poll->first == NULL) pool->first = newtask;
	else pool->last->next = newtask;
	pool->last = newtask;
	
	//如果有等待线程,则唤醒其中一个
	if(pool->idle > 0)
		condition_signal(&pool->ready);
	else if(pool->counter < pool->max_threads) {
		//没有等待线程,且当前线程数不超过最大线程数,则创建一个新的线程
		pthread_t tid;
		pthread_create(&tid, NULL, thread_routine, pool);
		pool->counter++;
	}
	
	condition_unlock(&pool->ready);
}

//销毁线程池,若是任务队列当中仍有任务要等待任务完成后再销毁
void threadpoll_destory(threadpoll_t *poll) {
	if(pool->quit) {
		return;
	}
	condition_lock(&pool->ready);
	pool->quit = 1;
	if(pool->counter > 0) {
		if(pool->idle > 0) condition_broadcast(&pool->ready);
		//处于执行任务状态中的线程,不会受到广播
		//线程池需要等待执行任务状态中的线程全部退出
		while(pool->counter > 0)
			condition_wait(&pool->ready);
	}
	condition_unlock(&pool->ready);
	condition_destory(&pool->ready);
}

condition.c

#include "condition.h"

int condition_init(condition_t *cond) {
	int status;
	if(status = pthread_mutex_init(&cond->pmutex, NULL))
		return status;
	if(status = pthread_cond_init(&cond->pcond, NULL))
		return status;
	return 0;
}
int condition_lock(condition_t *cond) {
	return phtread_mutex_lock(&cond->pmutex);
}
int condition_unlock(condition_t *cond) {
	return pthread_mutex_unlock(&cond->pmutex);
}
int condition_wait(condition_t *cond) {
	return pthread_cond_wait(&cond->pcond, &cond->pmutex);
}
int condition_timedwait(condition_t *cond, const struct timespec *abstime); {
	return pthread_cond_timewait(&cond->pcond, &cond->pmutex, abstime);
}
int condition_signal(condition_t *cond) {
	return pthread_cond_signal(&cond->pcond);
}
int condition_broadcast(condition_t *cond) {
	return pthread_cond_broadcast(&cond->pcond);
}
int condition_destory(condition_t *cond) {
	int status;
	if(status = pthread_mutex_destory(&cond->pmutex))
		return status;
	if(status = pthread_cond_destory(&cond->pcond))
		return statue;
	return 0;
}


main.c

//test.cpp
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include "threadpoll.h"

void* mytask(void *arg) 
{
	printf("Thread 0x%0x is working on task %d\\n", (int)pthread_self(), *(int*)arg);
	sleep(1);
	free(arg);
	return NULL;
}

int main(void) 
{
	threadpoll_t poll;
	threadpoll_init(&pool, 3);  //创建一个三个线程的线程池
	
	for(int i = 0; i < 10; i++) {
		int *arg = (int*)malloc(sizeof(int));
		*arg = i; //防止产生竞态
		threadpoll_add_task(&poll, mytask, arg);
	}
	
	//sleep(15);
	threadpool_destory(&pool);
	return 0;
}

以上是关于一个简单线程池的实现的主要内容,如果未能解决你的问题,请参考以下文章

一个简单线程池的实现

自己实现一个简单的线程池

线程池的简单实现

一个简单线程池的实现---需进一步完善

简单线程池的实现

Linux下简单线程池的实现