用Unix / C实现基于自动扩/减容线程池+epoll反应堆检测沉寂用户模型的服务器框架(含源码)
Posted 狱典司
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了用Unix / C实现基于自动扩/减容线程池+epoll反应堆检测沉寂用户模型的服务器框架(含源码)相关的知识,希望对你有一定的参考价值。
用Unix / C实现基于自动扩/减容线程池+epoll反应堆模型的服务器框架
前言
由于作者本人最近在忙期末复习(KPI压力,害),目前暂时不对该代码的各类结构体、函数单独拎出来做分析了,这项工作等到这个月考完试之后在完成!
代码的大部分内容都有比较详细的注释,这一个月内如果有读者有看不懂源码的地方可以在评论区提问,我看到会回复!如有误也欢迎批评指正!
这个基于线程池和epoll反应堆的服务器大概有以下几个特点吧,简要说一下:
- 综合了线程池和epoll反应堆的优点,这个一个月后做详细说明。
- 线程池实现了根据客户端的数目(任务数量)实现自动扩容和瘦身。
- epoll反应堆额外实现了检测沉寂用户的功能,若一个客户端连接在60秒内没有发送任何消息,服务器会主动断开连接。
注意:该源码的线程回调函数只是提供了最基础的大小写转换以测试C/S双端的联通性,可以在线程回调函数do_rw()上做自定义的拓展。
转发必须注明出处!
服务器端源码
文件名:server.c
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <assert.h>
#include <stdio.h>
#include <string.h>
#include <signal.h>
#include <errno.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <pthread.h>
#include <time.h>
#include <ctype.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include "helper.h"
#define DEFAULT_TIME 1 /*1s检测一次*/
#define MIN_WAIT_TASK_NUM 2 /*如果queue_size > MIN_WAIT_TASK_NUM 添加新的线程到线程池*/
#define MAX_EVENTS 4096 //epoll监听上限数
#define BUFLEN 4096
#define DEFAULT_THREAD_VARY 5 /*每次创建和销毁线程的个数*/
#define true 1
#define false 0
#define MAXLINE2 4096
#define SERV_PORT2 7777
void recvdata(int fd, int events, void *arg);
void senddata(int fd, int events, void *arg);
typedef struct
void *(*function)(void *); /* 函数指针,回调函数 */
void *arg; /* 上面函数的参数 */
threadpool_task_t; /* 各子线程任务结构体 */
/* 描述线程池相关信息 */
typedef struct threadpool_t
pthread_mutex_t lock; /* 用于锁住本结构体 */
pthread_mutex_t thread_counter; /* 记录忙状态线程个数de琐 -- busy_thr_num */
pthread_cond_t queue_not_full; /* 当任务队列满时,添加任务的线程阻塞,等待此条件变量 */
pthread_cond_t queue_not_empty; /* 任务队列里不为空时,通知等待任务的线程 */
pthread_t *threads; /* 存放线程池中每个线程的tid。数组 */
pthread_t adjust_tid; /* 存管理线程tid */
threadpool_task_t *task_queue; /* 任务队列(数组首地址) */
int min_thr_num; /* 线程池最小线程数 */
int max_thr_num; /* 线程池最大线程数 */
int live_thr_num; /* 当前存活线程个数 */
int busy_thr_num; /* 忙状态线程个数 */
int wait_exit_thr_num; /* 要销毁的线程个数 */
int queue_front; /* task_queue队头下标 */
int queue_rear; /* task_queue队尾下标 */
int queue_size; /* task_queue队中实际任务数 */
int queue_max_size; /* task_queue队列可容纳任务数上限 */
int shutdown; /* 标志位,线程池使用状态,true或false */
threadpool_t;
void *threadpool_thread(void *threadpool);
int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg);
void *adjust_thread(void *threadpool);
int is_thread_alive(pthread_t tid);
int threadpool_free(threadpool_t *pool);
/* 描述就绪文件描述符相关信息 */
struct myevent_s
int fd; //要监听的文件描述符
int events; //对应的监听事件
void *arg; //泛型参数
void (*call_back)(int fd, int events, void *arg); //回调函数
int status; //是否在监听:1->在红黑树上(监听), 0->不在(不监听)
char buf[BUFLEN];
int len;
long last_active; //记录每次加入红黑树 g_efd 的时间值
int thread_pos; //记录全局数组 struct s_info ts[256] 的下标供线程使用
threadpool_t *thp;
;
struct s_info /* thread action args */
struct sockaddr_in cliaddr;
int connfd;
int fd; /* */
int events; // 这三个参数用于将epoll监听红黑树上的回调函数和线程池的回调函数连接起来
void *arg; /* */
;
int g_efd; //全局变量, 保存epoll_create返回的文件描述符
struct myevent_s g_events[MAX_EVENTS+1]; //自定义结构体类型数组. +1-->listen fd
struct s_info ts[4096]; //用于记录线程
int k = 0;
/* ev.thread_pos = k; 另外启用一个线程准备work
k = (k+1)%4096;
这个k还是要上一个锁
*/
pthread_mutex_t klock;
/*将结构体 myevent_s 成员变量 初始化*/
/* eventset(&g_events[MAX_EVENTS], lfd, acceptconn, &g_events[MAX_EVENTS]); */
void eventset(struct myevent_s *ev, int fd, void (*call_back)(int, int, void *), void *arg)
ev->fd = fd;
ev->call_back = call_back;
ev->events = 0;
ev->arg = arg;
ev->status = 0;
memset(ev->buf, 0, sizeof(ev->buf));
ev->len = 0;
ev->last_active = time(NULL); //调用eventset函数的时间
return;
/* 向 epoll监听的红黑树 添加一个 文件描述符 */
/* eventadd(efd, EPOLLIN, &g_events[MAX_EVENTS]); */
void eventadd(int efd, int events, struct myevent_s *ev)
/* 从自定义的结构体指针struct myevent_s *的变量ev中 提取数据到一个可以挂在到epoll监听红黑树上的struct epoll_event变量 epv上 */
struct epoll_event epv = 0, 0;
int op;
epv.data.ptr = ev;
// epv.events = ev->events = events; //EPOLLIN 或 EPOLLOUT
epv.events = EPOLLIN | EPOLLET;
if (ev->status == 0) //已经在红黑树 g_efd 里
op = EPOLL_CTL_ADD; //将其加入红黑树 g_efd, 并将status置1
ev->status = 1;
if (epoll_ctl(efd, op, ev->fd, &epv) < 0) //实际添加/修改
printf("event add failed [fd=%d], events[%d]\\n", ev->fd, events);
else
printf("event add OK [fd=%d], op=%d, events[%0X]\\n", ev->fd, op, events);
return ;
/* 从epoll 监听的 红黑树中删除一个 文件描述符*/
void eventdel(int efd, struct myevent_s *ev)
struct epoll_event epv = 0, 0;
if (ev->status != 1) //不在红黑树上
return ;
//epv.data.ptr = ev;
epv.data.ptr = NULL; //抹去指针
ev->status = 0; //修改状态
epoll_ctl(efd, EPOLL_CTL_DEL, ev->fd, &epv); //从红黑树 efd 上将 ev->fd 摘除
return ;
/* 当有文件描述符就绪, epoll返回, 调用该函数 与客户端建立链接 */
/* 在acceptconn内部去做accept */
void acceptconn(int lfd, int events, void *arg)
puts("acception running...\\n");
struct sockaddr_in cin;
socklen_t len = sizeof(cin);
int cfd, i;
if ((cfd = accept(lfd, (struct sockaddr *)&cin, &len)) == -1)
if (errno != EAGAIN && errno != EINTR)
/* 暂时不做出错处理 */
printf("%s: accept, %s\\n", __func__, strerror(errno));
return ;
do
for (i = 0; i < MAX_EVENTS; i++) //从全局数组g_events中找一个空闲元素
if (g_events[i].status == 0) //类似于select中找值为-1的元素
break; //跳出 for
if (i == MAX_EVENTS)
printf("%s: max connect limit[%d]\\n", __func__, MAX_EVENTS);
break; //跳出do while(0) 不执行后续代码
int flag = 0;
if ((flag = fcntl(cfd, F_SETFL, O_NONBLOCK)) < 0) //将cfd也设置为非阻塞
printf("%s: fcntl nonblocking failed, %s\\n", __func__, strerror(errno));
break;
pthread_mutex_lock(&klock);
g_events[i].thread_pos = k;
k = (k+1)%4096; /* k作为参数传递 */
pthread_mutex_unlock(&klock);
ts[g_events[i].thread_pos].cliaddr = cin;
ts[g_events[i].thread_pos].connfd = cfd;
/* 给cfd设置一个 myevent_s 结构体, 回调函数 设置为 recvdata */
eventset(&g_events[i], cfd, recvdata, &g_events[i]);
eventadd(g_efd, EPOLLIN, &g_events[i]); //将cfd添加到红黑树g_efd中,监听读事件
//思路:把任务添加函数threadpool_add封装进结构体的回调函数 recvdata 中
while(0);
printf("new connect [%s:%d][time:%ld], pos[%d]\\n",
inet_ntoa(cin.sin_addr), ntohs(cin.sin_port), g_events[i].last_active, i);
return ;
void *do_rw(void *arg);
void recvdata(int fd, int events, void *arg)
struct myevent_s *ev = (struct myevent_s *)arg;
int position = ev->thread_pos;
ts[position].arg = arg;
ts[position].events = events;
ts[position].fd = fd;
threadpool_t *thp = ev->thp;
threadpool_add(thp, do_rw, (void*)&ts[position]); /* 向线程池中添加任务 */
/* 线程池中的线程,模拟处理业务 */
void *do_rw(void *arg)
puts("do_rw is trigger!\\n");
int i;
struct s_info *ts = (struct s_info *)arg;
int fd = ts->fd;
int events = ts->events;
void *argg = ts->arg;
struct myevent_s *ev = (struct myevent_s *)argg;
char buf[MAXLINE2];
char str[INET_ADDRSTRLEN];
/* 可以在创建线程前设置线程创建属性,设为分离态 */
pthread_detach(pthread_self());
printf("(rw)thread %d is recieving from PORT %d\\n", (unsigned int)pthread_self(), ntohs((*ts).cliaddr.sin_port));
while (1)
int n = Read(fd, buf, MAXLINE2);
if (n == 0) //因为在socket中的读也是阻塞的
printf("the other side has been closed.\\n");
break;
else if(n == -1)
continue;
printf("(rw)thread %d received from %s at PORT %d\\n", (unsigned int)pthread_self(),
inet_ntop(AF_INET, &(*ts).cliaddr.sin_addr, str, sizeof(str)),
ntohs((*ts).cliaddr.sin_port));
for (i = 0; i < n; i++)
buf[i] = toupper(buf[i]);
Write(ts->connfd, buf, n);
close(ts->connfd);
//做完任务之后要把事件从树上摘下
eventdel(fd, ev);
//threadpool_create(2,4096,4096);
threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size)
int i;
threadpool_t *pool = NULL; /* 线程池 结构体 */
do
if((pool = (threadpool_t *)malloc(sizeof(threadpool_t))) == NULL)
printf("malloc threadpool fail");
break; /*跳出do while*/
pool->min_thr_num = min_thr_num;
pool->max_thr_num = max_thr_num;
pool->busy_thr_num = 0;
pool->live_thr_num = min_thr_num; /* 活着的线程数 初值=最小线程数 */
pool->wait_exit_thr_num = 0;
pool->queue_size = 0; /* 有0个产品 */
pool->queue_max_size = queue_max_size; /* 最大任务队列数 */
pool->queue_front = 0;
pool->queue_rear = 0;
pool->shutdown = false; /* 不关闭线程池 */
/* 根据最大线程上限数, 给工作线程数组开辟空间, 并清零 */
pool->threads = (pthread_t *)malloc(sizeof(pthread_t)*max_thr_num);
if (pool->threads == NULL)
printf("malloc threads fail");
break;
memset(pool->threads, 0, sizeof(pthread_t)*max_thr_num);
/* 给 任务队列 开辟空间 */
pool->task_queue = (threadpool_task_t *)malloc(sizeof(threadpool_task_t)*queue_max_size);
if (pool->task_queue == NULL)
printf("malloc task_queue fail");
break;
/* 初始化互斥琐、条件变量 */
if (pthread_mutex_init(&(pool->lock), NULL) != 0
|| pthread_mutex_init(&(pool->thread_counter), NULL) != 0
|| pthread_cond_init(&(pool->queue_not_empty), NULL) != 0
|| pthread_cond_init(&(pool->queue_not_full), NULL) != 0)
printf("init the lock or cond fail");
break;
/* 启动 min_thr_num 个 work thread */
for (i = 0; i < min_thr_num; i++)
pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool); /*pool指向当前线程池,作为线程回调函数的参数 void* args*/
printf("start thread %d...\\n", (unsigned int)pool->threads[i]);
/* 创建 1 个管理者线程。 */
pthread_create(&(pool->adjust_tid), NULL, adjust_thread, (void *)pool); /* 创建管理者线程 */
return pool;
while (0);
threadpool_free(pool); /* 前面代码调用失败时,释放poll存储空间 */
return NULL;
/* 向线程池中 添加一个任务 */
//threadpool_add(thp, process, (void*)&num[i]); /* 向线程池中添加任务 process: 小写---->大写*/
/* 在这个函数里面把任务写道任务队列中 */
int threadpool_add(threadpool_t *pool, void*<以上是关于用Unix / C实现基于自动扩/减容线程池+epoll反应堆检测沉寂用户模型的服务器框架(含源码)的主要内容,如果未能解决你的问题,请参考以下文章