线程池学习笔记
Posted 专注it
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程池学习笔记相关的知识,希望对你有一定的参考价值。
记录一下学习线程池的过程,代码用到的函数归结:
pthread_mutex_lock
pthread_mutex_unlock
pthread_cond_wait
pthread_cond_signal
pthread_cond_broadcast
pthread_create
pthread_join
程序中还用到了链表,
还有一个知识点:任何类型的数据都可以是void类型,
但void类型在使用之前必须进行强制类型转换。
/*
*Author:Greens_Ren
*Description:线程池
*/
/*头文件*/
#include<pthread.h>
#include<stdio.h>
/* begin : add */
#include<stdlib.h>
#include<unistd.h>
#include<assert.h>
#include<sys/types.h>
/* end : add */
/*数据结构*/
typedef struct Thread_worker
{
void *(*worker)(void *arg);
void * arg;
struct Thread_worker *next;
}CThread_worker;
typedef struct Thread_pool
{
pthread_mutex_t queue_lock;
pthread_cond_t queue_ready;
int max_thread_num;
pthread_t *phead_threadid;
int cur_queue_size;
CThread_worker *phead;
int shutdown;
}CThread_pool;
/*全局区*/
static CThread_pool *pool = NULL;
/*函数*/
void pthread_init(int max_thread_num);
void *thread_roution(void * arg);
void pthread_add_worker(void *(*worker)(void *arg), void * arg);
void pthread_destroy(void);
void* my_process(void *arg);
/*main*/
int main(void)
{
int max_thread_num = 3;
int worker_num = 10;
/*初始化*/
pthread_init(max_thread_num);
/*投入任务*/
int *workernum = (int *)malloc(sizeof(int) * worker_num);
int i;
for(i = 0; i < worker_num; i++)
{
workernum[i] = i;
pthread_add_worker(my_process, &workernum[i]);
}
/*等待处理任务*/
sleep(8);
pthread_destroy();
return 0;
}
void pthread_init(int max_thread_num)
{
pool = (CThread_pool*)malloc(sizeof(CThread_pool)); /*free(pool)*/
/* begin : modified 初始化互斥锁和条件变量*/
pthread_mutex_init(&(pool->queue_lock), NULL);
pthread_cond_init(&(pool->queue_ready), NULL);
/* end : modified */
/*初始化线程*/
pool->max_thread_num = max_thread_num;
pool->phead_threadid = (pthread_t *)malloc(sizeof(pthread_t) * max_thread_num); /*free*/
int i;
for (i = 0; i < max_thread_num; i ++)
{
pthread_create(&(pool->phead_threadid[i]), NULL, thread_roution, NULL);
}
/*初始化任务等待队列*/
pool->cur_queue_size = 0;
//pool->phead = (CThread_worker *)malloc(sizeof(CThread_worker));
//pool->phead->next = NULL;
/*罪过,这里考虑多了。在pool中就是存了一个指针,只不过它是指定了一个链表*/
pool->phead = NULL;
/*线程池销毁标记*/
pool->shutdown = 0;
return;
}
void *thread_roution(void * arg)
{
printf("starting thread 0x%x\n", pthread_self());
while(1) /*Added 线程要持续运行,这里考虑使用while(1)来实现*/
{
pthread_mutex_lock(&(pool->queue_lock));
/*如果目前没有任务处理且线程池未销毁就睡眠等待添加任务*/
//if(pool->cur_queue_size == 0 && pool->shutdown != 1)
while(pool->cur_queue_size == 0 && pool->shutdown != 1) /*modified by*/
{
//pthread_cond_wait( &(pool->queue_lock), &(pool->queue_ready) );
printf("thread 0x%x is waiting\n", pthread_self());
pthread_cond_wait( &(pool->queue_ready), &(pool->queue_lock) );
}
/*如果线程池已被标记销毁,那就退出线程*/
if(pool->shutdown == 1)
{
pthread_mutex_unlock(&(pool->queue_lock));
printf("thread 0x%x will exit\n", pthread_self());
pthread_exit(NULL);
}
printf("thread 0x%x is starting to work\n", pthread_self());
/*assert是调试用的好助手,assert如果为家,它就会通过stderr打印错误信息,并终止程序*/
assert(pool->cur_queue_size != 0);
assert(pool->shutdown != 1);
/*开始处理等待队列中的任务*/
/*回调函数*/
pool->cur_queue_size--;
CThread_worker * worker_waiting = pool->phead;
pool->phead = worker_waiting->next;
/*这里加锁的目的就是为了处理任务链表,处理完后就 可以解锁,让其它线程再次去处理任务链表*/
pthread_mutex_unlock(&(pool->queue_lock)); /*modified by 开始的时候讲解锁放在了回调函数后面,这里做修正*/
/*调用回调函数,执行任务*/
(*(worker_waiting->worker))(worker_waiting->arg);
/*删除链表中已经执行过的任务节点*/
free(worker_waiting);
worker_waiting = NULL;
}
}
void pthread_add_worker(void *(*worker)(void *arg), void * arg)
{
assert(worker != NULL);
assert(pool->shutdown != 1);
/*构建新任务插入到任务链表尾部*/
CThread_worker * worker_insert =
(CThread_worker *)malloc(sizeof(CThread_worker));/*free*/
worker_insert->worker = worker;
worker_insert->arg = arg;
worker_insert->next = NULL;
/*下面要处理任务链表了,这里要加锁,保护链表。第一次写的时候就忘记了加锁*/
pthread_mutex_lock(&(pool->queue_lock));
CThread_worker* phead_worker = pool->phead;
/*链表的头指针一开始其实就是空指针,所以如果为NULL,就直接将构建的节点地址赋值给它就可以了*/
if(phead_worker != NULL)
{
while( phead_worker->next != NULL )
{
phead_worker = phead_worker->next;
}
phead_worker->next = worker_insert; /*这里将新构建的任务节点添加到了等待任务链表的结尾*/
}
else
{
pool->phead = worker_insert;
}
assert(pool->phead != NULL);/*这里检查一下等待链表不为空*/
pool->cur_queue_size++; /*同步修改等待队列的长度信息*/
pthread_mutex_unlock(&(pool->queue_lock));
/*等待队列中添加了新任务,这里就要唤醒线程去处理,如果无线程睡眠,这条语句就无效*/
pthread_cond_signal(&(pool->queue_ready));
return;
}
void pthread_destroy(void)
{
if(pool->shutdown)
{
return; /*这里防止多次调用*/
}
/*这里先将销毁线程池标记置位*/
pool->shutdown = 1;
/*唤醒所以等待线程,线程池要销毁了*/
pthread_cond_broadcast(&(pool->queue_ready));
/*阻塞等待线程线程退出,否则就成了僵尸了*/
int i;
for (i = 0; i < pool->max_thread_num; i ++)
{
pthread_join(pool->phead_threadid[i], NULL);
}
/*释放线程号存储占用资源*/
free(pool->phead_threadid);
pool->phead_threadid = NULL;
/*释放任务等待队列*/
CThread_worker *pworker_del = NULL;
while(pool->phead != NULL)
{
pworker_del = pool->phead;
pool->phead = pool->phead->next;
free(pworker_del);
pworker_del = NULL;
}
/*销毁条件变量和互斥锁,刚开始也忘记了*/
pthread_mutex_destroy(&(pool->queue_lock));
pthread_cond_destroy(&(pool->queue_ready));
/*释放线程池*/
free(pool);
pool = NULL;
return;
}
void *my_process(void * arg)
{
printf("thread is 0x%x, working ont task %d\n", pthread_self(), *(int *)arg);
sleep(1);
return;
}
以上是关于线程池学习笔记的主要内容,如果未能解决你的问题,请参考以下文章
newCacheThreadPool()newFixedThreadPool()newScheduledThreadPool()newSingleThreadExecutor()自定义线程池(代码片段