简单线程池实现 (C版本)
Posted cp3alai
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了简单线程池实现 (C版本)相关的知识,希望对你有一定的参考价值。
1. 概述
在谈到线程池之前,我们看看并发还有哪几种方式.
多进程,包括一次启动多个进程,然后在进程间传递描述符,或者连接来了以后fork子进程处理.
多线程,同理也可以像多进程一样,要么建立一堆放在那里,要么就有连接以后"即时建立,即时销毁"
我们看上面的方法并没有什么不妥,特殊的场景总是能够派上用场的.
比如一个并发量只有两位数甚至个位数,那么上面的这些方式应付起来也很轻松.但是如果要举一个极端的例子,十万的并发,首先不可能建立十万个进程等着这个并发,也不可能即时创建即时销毁.这些都无形中加重了系统的负担.
考虑一个问题,可能10W个并发,每个连接请求的服务仅仅1秒甚至更短就结束了.以上的方式显然面对这个场景而言,有点粗放了.这个时候,我们需要一个可以应付高并发,并且没有那么吃资源的并发方式.那么线程池也就脱颖而出了.
2. 设计
线程池,顾名思义,就是一个池子,里面放一堆线程.没事的时候待着,个个心怀鬼胎,摩拳擦掌,有事了谁抢到算谁的...
那么线程池该怎么实现呢.
1. 入口
正如上面说的,线程池就是一堆线程在一个池子里面,那么问题来了,该怎么保证这些线程没事的时候安静的待着.简单的mutex是做不到这一点的,需要结合cond才能达到这个效果.
pthread_mutex_lock(pthread_pool->pmutex);
// while避免虚假唤醒
while (pthread_pool->pthread_job->job_num == 0 && 0 == pthread_pool->need_destroy)
{
pthread_cond_wait(pthread_pool->pcond, pthread_pool->pmutex);
}
这样就可以保证线程没事的时候不会折腾了.这里解释一下while的作用,就如注释所说,避免虚假唤醒,就是可能没有pthread_cond_signal或者pthread_cond_broadcast别调用,但是线程还是从pthread_cond_wait返回了,如果没有while这个判断,线程将会一直错下去,这显然不是我们想看到的.
2. 任务分配
毫无疑问,我们得借助回调来实现任务的分配.这个回调函数完全可以模拟线程自身的回调.
// argv指定的地址由线程池释放
typedef struct tag_job
{
thread_cb cb;
void *argv;
struct tag_job *next;
}job_t, *job_pt;
typedef struct
{
int32_t job_num;
job_pt head;
job_pt tail;
}thread_job_t, *thread_job_pt;
对于线程池而言,其任务必然得是一个队列,也可以理解为任务池,任何一个被唤醒的线程就去这个池子里面取一个任务.然后执行.执行完就继续睡大觉.
3. 销毁
先说下我遇到的问题,我今天使用了几种销毁的方式,事实证明都是错的.最后借鉴网络上的一些例子,不过我现在找不到那个例子了...惭愧.
错误a. 直接调用 pthread_cancel,如果一个线程阻塞在cond那里,你无法用这种方式销毁它,如果恰巧它正在工作,然后你成功把它销毁了,还不如没有销毁,因为死锁了.
错误b. 使用pthread_kill发送SIGKILL.然后整个程序就跟着一块挂掉了.最后发现,信号是进程内共享的.如果不想被异常退出,要在主线程中捕获这个信号.然后在线程池也要有相应的信号处理函数.当然了,这个信号绝对不是SIGSTOP和SIGKILL.
错误c. pthread_cond_broadcast后直接释放mutex和cond.要知道,线程池可能并没有主线程退出的快,也就是说,主线程这边已经把cond和mutex释放了,线程池里面还有没成功退出的线程,此时它既可能调用unlock,也可能调用lock,无论那个都会导致程序异常崩溃.
好吧,说下正确的做法,当broad以后,要等待确认所有线程退出以后,再释放cond和mutex.这样才能使程序正常.
在我的线程池里面,我加了一个master,也就是说,如果创建一个10个线程的池子,我的线程池会创建11个线程,第11个线程就是用来保活的,如果线程池中的某个线程以为某个逻辑异常return了.就需要这个线程来重新启动它...
3. 实现
thread.h
/*!****************************************************************************
* Coypright(C) 2014-2024 () technology Co., Ltd
*
* 文件名 : thread.h
* 版本号 : 1.0
* 描 述 : 线程池实现
* 作 者 : cp3alai
* 日 期 : 2016.05.31
*****************************************************************************/
#include <unistd.h>
#include <stdlib.h>
#include <stdint.h>
#include <string.h>
#include <pthread.h>
#include <signal.h>
#include <errno.h>
typedef void (*thread_cb)(void *argv);
// argv指定的地址由线程池释放
typedef struct tag_job
{
thread_cb cb;
void *argv;
struct tag_job *next;
}job_t, *job_pt;
typedef struct
{
int32_t job_num;
job_pt head;
job_pt tail;
}thread_job_t, *thread_job_pt;
int thread_job_init(thread_job_pt *pthread_job);
int thread_job_push(thread_job_pt thread_job, thread_cb cb, void *arg);
job_pt thread_job_pop(thread_job_pt thread_job);
void thread_job_destroy(thread_job_pt thread_job);
typedef struct
{
int32_t need_destroy;
int32_t thread_num;
pthread_t pthread_master;
pthread_t *pthread_id;
thread_job_pt pthread_job;
pthread_cond_t *pcond;
pthread_mutex_t *pmutex;
}thread_pool_t, *thread_pool_pt;
int pthread_create_detach(pthread_t *thread, void *(*start_routine)(void*), void *arg);
void *thread_entry(void *arg);
void *thread_master(void *arg);
int thread_pool_init(thread_pool_t **pthread_pool, int num);
int thread_pool_add(thread_pool_pt pthread_pool, thread_cb, void *arg);
int thread_pool_kill();
int thread_pool_keepalive();
void thread_pool_destroy(thread_pool_pt pthread_pool);
thread.c
/*!****************************************************************************
* Coypright(C) 2014-2024 () technology Co., Ltd
*
* 文件名 : /media/alai/work/workspace/CC++/mrtpoll/thread.c
* 版本号 : 1.0
* 描 述 :
* 作 者 : cp3alai
* 日 期 : 2016.05.31
*****************************************************************************/
#include "thread.h"
int thread_job_init(thread_job_pt *pthread_job)
{
int32_t ret;
if (NULL == pthread_job)
{
ret = -1;
goto ERROR;
}
*pthread_job = (thread_job_pt)malloc(sizeof(thread_job_t));
if (NULL == *pthread_job)
{
ret = -1;
goto ERROR;
}
bzero(*pthread_job, sizeof(thread_job_t));
(*pthread_job)->head = NULL;
(*pthread_job)->tail= NULL;
ret = 0;
ERROR:
return ret;
}
int thread_job_push(thread_job_pt thread_job, thread_cb cb, void *arg)
{
int32_t ret = 0;
job_pt pjob = NULL;
if (NULL == thread_job || NULL == cb)
{
ret = -1;
goto ERROR;
}
pjob = (job_pt)malloc(sizeof(job_t));
if (NULL == pjob)
{
ret = -1;
goto ERROR;
}
pjob->argv = arg;
pjob->cb = cb;
pjob->next = NULL;
if (NULL != thread_job->head)
{
thread_job->tail->next = pjob;
thread_job->tail = pjob;
}
else
{
thread_job->head = pjob;
thread_job->tail = pjob;
}
thread_job->job_num++;
ERROR:
return ret;
}
// 这里返回的job需要使用它的线程释放
job_pt thread_job_pop(thread_job_pt thread_job)
{
job_pt job = NULL;
if (NULL == thread_job)
{
return NULL;
}
if (NULL == thread_job->head)
{
return NULL;
}
job = thread_job->head;
thread_job->head = thread_job->head->next;
thread_job->job_num--;
return job;
}
void thread_job_destroy(thread_job_pt thread_job)
{
if (NULL == thread_job)
{
return;
}
while (NULL != thread_job->head)
{
job_pt pjob = thread_job->head;
thread_job->head = pjob->next;
free(pjob);
pjob = NULL;
}
free(thread_job);
thread_job = NULL;
return;
}
int pthread_create_detach(pthread_t *thread, void *(*start_routine)(void*), void *arg)
{
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, 1);
return pthread_create(thread, &attr, start_routine, arg);
}
void *thread_master(void *arg)
{
if (NULL == arg)
{
return NULL;
}
thread_pool_pt pthread_pool = (thread_pool_pt)arg;
int i;
while (1)
{
for (i = 0; i < pthread_pool->thread_num; i++)
{
if (0 == pthread_pool->pthread_id[i])
{
pthread_create_detach(&pthread_pool->pthread_id[i], thread_entry, (void*)pthread_pool);
}
if ((pthread_kill(pthread_pool->pthread_id[i], 0)) < 0)
{
if (errno == ESRCH)
{
pthread_create_detach(&pthread_pool->pthread_id[i], thread_entry, (void*)pthread_pool);
}
}
sleep(1);
}
}
return NULL;
}
void *thread_entry(void *arg)
{
if (NULL == arg)
{
return NULL;
}
thread_pool_pt pthread_pool = (thread_pool_pt)arg;
while (1)
{
pthread_mutex_lock(pthread_pool->pmutex);
// 貌似这样写也可以避免虚假唤醒
// while (pthread_cond_wait(pthread_pool->pcond, pthread_pool->pmutex))
while (pthread_pool->pthread_job->job_num == 0 && 0 == pthread_pool->need_destroy)
{
pthread_cond_wait(pthread_pool->pcond, pthread_pool->pmutex);
}
// 如果销毁线程,则这里直接退出
if (0 != pthread_pool->need_destroy)
{
pthread_pool->thread_num--;
pthread_mutex_unlock(pthread_pool->pmutex);
break;
}
job_pt pjob = thread_job_pop(pthread_pool->pthread_job);
if (NULL == pjob)
{
continue;
}
pthread_mutex_unlock(pthread_pool->pmutex);
pjob->cb(pjob->argv);
// 用完了要释放...
free(pjob);
pjob = NULL;
}
return NULL;
}
int thread_pool_init(thread_pool_t **pthread_pool, int num)
{
int ret = 0;
int i;
if (NULL == pthread_pool || 0 == num)
{
ret = -1;
goto ERROR;
}
*pthread_pool = (thread_pool_pt)malloc(sizeof(thread_pool_t));
if (NULL == *pthread_pool)
{
ret = -1;
goto ERROR;
}
bzero((*pthread_pool), sizeof(thread_pool_t));
(*pthread_pool)->pmutex = (pthread_mutex_t *)malloc(sizeof(pthread_mutex_t));
if (NULL == (*pthread_pool)->pmutex)
{
ret = -1;
goto ERROR;
}
(*pthread_pool)->pcond = (pthread_cond_t *)malloc(sizeof(pthread_cond_t));
if (NULL == (*pthread_pool)->pcond)
{
ret = -1;
goto ERROR;
}
pthread_mutex_init((*pthread_pool)->pmutex, NULL);
pthread_cond_init((*pthread_pool)->pcond, NULL);
if ((ret = thread_job_init(&(*pthread_pool)->pthread_job)) < 0)
{
ret = -1;
goto ERROR;
}
(*pthread_pool)->pthread_id = (pthread_t*)malloc(num * sizeof(pthread_t));
if (NULL == (*pthread_pool)->pthread_id)
{
ret = -1;
goto ERROR;
}
bzero((*pthread_pool)->pthread_id, sizeof(pthread_t) * num);
(*pthread_pool)->thread_num = num;
for (i = 0; i < num; i++)
{
ret = pthread_create_detach(&(*pthread_pool)->pthread_id[i], thread_entry, (void *)(*pthread_pool));
if (ret < 0)
{
(*pthread_pool)->pthread_id[i] = 0;
}
}
ret = pthread_create_detach(&(*pthread_pool)->pthread_master, thread_master, (void *)(*pthread_pool));
if (ret < 0)
{
ret = -1;
goto ERROR;
}
ret = 0;
ERROR:
if (ret < 0)
{
if (NULL != (*pthread_pool))
{
thread_pool_destroy((*pthread_pool));
}
}
return ret;
}
int thread_pool_add(thread_pool_pt pthread_pool, thread_cb cb, void *arg)
{
int32_t ret = 0;
if (NULL == pthread_pool || NULL == cb)
{
ret = -1;
goto ERROR;
}
if (NULL == pthread_pool->pthread_job)
{
ret = -1;
goto ERROR;
}
pthread_mutex_lock(pthread_pool->pmutex);
ret = thread_job_push(pthread_pool->pthread_job, cb, arg);
if (ret < 0)
{
ret = -1;
goto ERROR;
}
pthread_cond_signal(pthread_pool->pcond);
pthread_mutex_unlock(pthread_pool->pmutex);
ret = 0;
ERROR:
return ret;
}
int thread_pool_kill(thread_pool_pt pthread_pool, int32_t thread_no, int32_t signo)
{
int32_t ret;
int32_t i;
if (NULL == pthread_pool)
{
ret = -1;
goto ERROR;
}
if (0 == thread_no)
{
for (i = 0; i < pthread_pool->thread_num; i++)
{
pthread_kill(pthread_pool->pthread_id[i], signo);
}
}
else
{
ret = pthread_kill(pthread_pool->pthread_id[thread_no], signo);
if (ret < 0)
{
ret = -1;
goto ERROR;
}
}
ret = 0;
ERROR:
return ret;
}
int thread_pool_keepalive(thread_pool_pt pthread_pool)
{
int32_t ret;
int32_t i;
if (NULL == pthread_pool)
{
return -1;
}
for (i = 0; i < pthread_pool->thread_num; i++)
{
ret = pthread_kill(pthread_pool->pthread_id[i], 0);
if (ret < 0)
{
if (errno == ESRCH)
{
pthread_create_detach(&pthread_pool->pthread_id[i], thread_entry, (void *)pthread_pool);
}
}
}
return 0;
}
void thread_pool_destroy(thread_pool_pt pthread_pool)
{
int32_t i;
if (NULL == pthread_pool)
{
return;
}
if (0 != pthread_pool->pthread_master)
{
pthread_cancel(pthread_pool->pthread_master);
}
if (NULL != pthread_pool->pthread_id)
{
pthread_pool->need_destroy = 1;
pthread_cond_broadcast(pthread_pool->pcond);
free(pthread_pool->pthread_id);
pthread_pool->pthread_id = NULL;
}
if (NULL != pthread_pool->pthread_job)
{
thread_job_destroy(pthread_pool->pthread_job);
}
// 等待线程自销毁完毕
while (pthread_pool->thread_num > 0)
{
usleep(0);
}
if (NULL != pthread_pool->pmutex)
{
pthread_mutex_destroy(pthread_pool->pmutex);
free(pthread_pool->pmutex);
pthread_pool->pmutex = NULL;
}
if (NULL != pthread_pool->pcond)
{
pthread_cond_destroy(pthread_pool->pcond);
free(pthread_pool->pcond);
pthread_pool->pcond = NULL;
}
}
main.c
#include <stdio.h>
#include "thread.h"
void thread_routine1(void *argv)
{
fprintf(stderr, "this is thread 1\\n");
return;
}
void thread_routine2(void *argv)
{
fprintf(stderr, "this is thread 2\\n");
return;
}
int main(int argc, char **argv)
{
int32_t ret;
thread_pool_pt pthread_pool;
ret = thread_pool_init(&pthread_pool, 30);
if (ret < 0)
{
return -1;
}
thread_pool_add(pthread_pool, thread_routine1, NULL);
thread_pool_add(pthread_pool, thread_routine2, NULL);
thread_pool_add(pthread_pool, thread_routine2, NULL);
thread_pool_add(pthread_pool, thread_routine2, NULL);
thread_pool_add(pthread_pool, thread_routine2, NULL);
thread_pool_add(pthread_pool, thread_routine2, NULL);
thread_pool_add(pthread_pool, thread_routine1, NULL);
thread_pool_add(pthread_pool, thread_routine1, NULL);
thread_pool_add(pthread_pool, thread_routine1, NULL);
thread_pool_add(pthread_pool, thread_routine1, NULL);
sleep(5);
thread_pool_destroy(pthread_pool);
sleep(5);
return 0;
}
简单的
Makefile
###################################################
### Make with me
###################################################
CC := gcc
TARGET := pool_test
OBJ := main.o \\
thread.o
INCS := -I.
LIBS := -lpthread
CFLAGS := -ggdb3 -O0
$(TARGET) : $(OBJ)
$(CC) -o $(TARGET) $(OBJ) $(CFLAGS) $(INCS) $(LIBS)
.PHONY: clean
clean:
@echo "cleaned"
@rm -rf $(TARGET) $(OBJ)
执行结果如图 :
4. 后记
写到最后,其实这篇博客还有另外一个目的,就是希望各位看到的同学们,如果代码中有问题,请在评论中帮忙指出来.真的是不胜感激了...
以上是关于简单线程池实现 (C版本)的主要内容,如果未能解决你的问题,请参考以下文章
newCacheThreadPool()newFixedThreadPool()newScheduledThreadPool()newSingleThreadExecutor()自定义线程池(代码片段