用Unix / C实现基于自动扩/减容线程池+epoll反应堆检测沉寂用户模型的服务器框架(含源码)

Posted 狱典司

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了用Unix / C实现基于自动扩/减容线程池+epoll反应堆检测沉寂用户模型的服务器框架(含源码)相关的知识,希望对你有一定的参考价值。

用Unix / C实现基于自动扩/减容线程池+epoll反应堆模型的服务器框架

前言

由于作者本人最近在忙期末复习(KPI压力,害),目前暂时不对该代码的各类结构体、函数单独拎出来做分析了,这项工作等到这个月考完试之后在完成!

代码的大部分内容都有比较详细的注释,这一个月内如果有读者有看不懂源码的地方可以在评论区提问,我看到会回复!如有误也欢迎批评指正!

这个基于线程池和epoll反应堆的服务器大概有以下几个特点吧,简要说一下:

  1. 综合了线程池和epoll反应堆的优点,这个一个月后做详细说明。
  2. 线程池实现了根据客户端的数目(任务数量)实现自动扩容和瘦身。
  3. epoll反应堆额外实现了检测沉寂用户的功能,若一个客户端连接在60秒内没有发送任何消息,服务器会主动断开连接。

注意:该源码的线程回调函数只是提供了最基础的大小写转换以测试C/S双端的联通性,可以在线程回调函数do_rw()上做自定义的拓展。


转发必须注明出处!

服务器端源码

文件名:server.c

#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <assert.h>
#include <stdio.h>
#include <string.h>
#include <signal.h>
#include <errno.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <pthread.h>
#include <time.h>
#include <ctype.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include "helper.h"

#define DEFAULT_TIME 1                 /*1s检测一次*/
#define MIN_WAIT_TASK_NUM 2            /*如果queue_size > MIN_WAIT_TASK_NUM 添加新的线程到线程池*/ 
#define MAX_EVENTS  4096               //epoll监听上限数
#define BUFLEN 4096			

#define DEFAULT_THREAD_VARY 5          /*每次创建和销毁线程的个数*/
#define true 1
#define false 0

#define MAXLINE2 4096
#define SERV_PORT2 7777

void recvdata(int fd, int events, void *arg);
void senddata(int fd, int events, void *arg);

typedef struct 
    void *(*function)(void *);          /* 函数指针,回调函数 */
    void *arg;                          /* 上面函数的参数 */

 threadpool_task_t;                    /* 各子线程任务结构体 */

/* 描述线程池相关信息 */

typedef struct threadpool_t 
    pthread_mutex_t lock;               /* 用于锁住本结构体 */    
    pthread_mutex_t thread_counter;     /* 记录忙状态线程个数de琐 -- busy_thr_num */

    pthread_cond_t queue_not_full;      /* 当任务队列满时,添加任务的线程阻塞,等待此条件变量 */
    pthread_cond_t queue_not_empty;     /* 任务队列里不为空时,通知等待任务的线程 */

    pthread_t *threads;                 /* 存放线程池中每个线程的tid。数组 */
    pthread_t adjust_tid;               /* 存管理线程tid */
    threadpool_task_t *task_queue;      /* 任务队列(数组首地址) */

    int min_thr_num;                    /* 线程池最小线程数 */
    int max_thr_num;                    /* 线程池最大线程数 */
    int live_thr_num;                   /* 当前存活线程个数 */
    int busy_thr_num;                   /* 忙状态线程个数 */
    int wait_exit_thr_num;              /* 要销毁的线程个数 */

    int queue_front;                    /* task_queue队头下标 */
    int queue_rear;                     /* task_queue队尾下标 */
    int queue_size;                     /* task_queue队中实际任务数 */
    int queue_max_size;                 /* task_queue队列可容纳任务数上限 */

    int shutdown;                       /* 标志位,线程池使用状态,true或false */
threadpool_t;

void *threadpool_thread(void *threadpool);
int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg);

void *adjust_thread(void *threadpool);

int is_thread_alive(pthread_t tid);
int threadpool_free(threadpool_t *pool);


/* 描述就绪文件描述符相关信息 */

struct myevent_s 
    int fd;                                                 //要监听的文件描述符
    int events;                                             //对应的监听事件
    void *arg;                                              //泛型参数
    void (*call_back)(int fd, int events, void *arg);       //回调函数
    int status;                                             //是否在监听:1->在红黑树上(监听), 0->不在(不监听)
    char buf[BUFLEN];
    int len;
    long last_active;                                       //记录每次加入红黑树 g_efd 的时间值
    int thread_pos;											//记录全局数组 struct s_info ts[256] 的下标供线程使用 
    threadpool_t *thp;

;


struct s_info 				/* thread action args */
	struct sockaddr_in cliaddr;
	int connfd;
	
	int fd;								/*              */ 
	int events;							// 这三个参数用于将epoll监听红黑树上的回调函数和线程池的回调函数连接起来 
	void *arg;							/*              */
;




int g_efd;                                                  //全局变量, 保存epoll_create返回的文件描述符
struct myevent_s g_events[MAX_EVENTS+1];                    //自定义结构体类型数组. +1-->listen fd
struct s_info ts[4096]; 										//用于记录线程 
int k = 0; 
/*  ev.thread_pos = k;								       另外启用一个线程准备work 
	k = (k+1)%4096;	
	这个k还是要上一个锁		
*/ 
pthread_mutex_t klock;




/*将结构体 myevent_s 成员变量 初始化*/
/* eventset(&g_events[MAX_EVENTS], lfd, acceptconn, &g_events[MAX_EVENTS]); */
void eventset(struct myevent_s *ev, int fd, void (*call_back)(int, int, void *), void *arg)

    ev->fd = fd;
    ev->call_back = call_back;
    ev->events = 0;
    ev->arg = arg;
    ev->status = 0;
    memset(ev->buf, 0, sizeof(ev->buf));
    ev->len = 0;
    ev->last_active = time(NULL);                       //调用eventset函数的时间

    return;


/* 向 epoll监听的红黑树 添加一个 文件描述符 */
/* eventadd(efd, EPOLLIN, &g_events[MAX_EVENTS]); */
void eventadd(int efd, int events, struct myevent_s *ev)

	/* 从自定义的结构体指针struct myevent_s *的变量ev中 提取数据到一个可以挂在到epoll监听红黑树上的struct epoll_event变量 epv上 */
    struct epoll_event epv = 0, 0;
    int op;
    epv.data.ptr = ev;
//    epv.events = ev->events = events;       //EPOLLIN 或 EPOLLOUT
    epv.events = EPOLLIN | EPOLLET;
    if (ev->status == 0)                                           //已经在红黑树 g_efd 里
        op = EPOLL_CTL_ADD;                 //将其加入红黑树 g_efd, 并将status置1
        ev->status = 1;
    

    if (epoll_ctl(efd, op, ev->fd, &epv) < 0)                       //实际添加/修改
        printf("event add failed [fd=%d], events[%d]\\n", ev->fd, events);
    else
        printf("event add OK [fd=%d], op=%d, events[%0X]\\n", ev->fd, op, events);

    return ;


/* 从epoll 监听的 红黑树中删除一个 文件描述符*/

void eventdel(int efd, struct myevent_s *ev)

    struct epoll_event epv = 0, 0;

    if (ev->status != 1)                                        //不在红黑树上
        return ;

    //epv.data.ptr = ev;
    epv.data.ptr = NULL;										//抹去指针
    ev->status = 0;                                             //修改状态
    epoll_ctl(efd, EPOLL_CTL_DEL, ev->fd, &epv);                //从红黑树 efd 上将 ev->fd 摘除

    return ;


/*  当有文件描述符就绪, epoll返回, 调用该函数 与客户端建立链接 */


/* 在acceptconn内部去做accept */
void acceptconn(int lfd, int events, void *arg)

    puts("acception running...\\n");
    struct sockaddr_in cin;
    socklen_t len = sizeof(cin);
    int cfd, i;

    if ((cfd = accept(lfd, (struct sockaddr *)&cin, &len)) == -1) 
        if (errno != EAGAIN && errno != EINTR) 
            /* 暂时不做出错处理 */
        
        printf("%s: accept, %s\\n", __func__, strerror(errno));
        return ;
    

    do 
        for (i = 0; i < MAX_EVENTS; i++)                                //从全局数组g_events中找一个空闲元素
            if (g_events[i].status == 0)                                //类似于select中找值为-1的元素
                break;                                                  //跳出 for

        if (i == MAX_EVENTS) 
            printf("%s: max connect limit[%d]\\n", __func__, MAX_EVENTS);
            break;                                                      //跳出do while(0) 不执行后续代码
        

        int flag = 0;
        if ((flag = fcntl(cfd, F_SETFL, O_NONBLOCK)) < 0)              //将cfd也设置为非阻塞
            printf("%s: fcntl nonblocking failed, %s\\n", __func__, strerror(errno));
            break;
        
        pthread_mutex_lock(&klock);
        g_events[i].thread_pos = k;
        k = (k+1)%4096;		/*  k作为参数传递  */
	pthread_mutex_unlock(&klock);
        ts[g_events[i].thread_pos].cliaddr = cin;
		ts[g_events[i].thread_pos].connfd = cfd;

        /* 给cfd设置一个 myevent_s 结构体, 回调函数 设置为 recvdata */
        eventset(&g_events[i], cfd, recvdata, &g_events[i]);		
        eventadd(g_efd, EPOLLIN, &g_events[i]);                         //将cfd添加到红黑树g_efd中,监听读事件
        
		//思路:把任务添加函数threadpool_add封装进结构体的回调函数 recvdata 中 
     while(0);

    printf("new connect [%s:%d][time:%ld], pos[%d]\\n",
            inet_ntoa(cin.sin_addr), ntohs(cin.sin_port), g_events[i].last_active, i);
    return ;


void *do_rw(void *arg);


void recvdata(int fd, int events, void *arg)
	
	struct myevent_s *ev = (struct myevent_s *)arg;
	
	int position =  ev->thread_pos; 
	
	ts[position].arg = arg;
	ts[position].events = events;
	ts[position].fd = fd;
	threadpool_t *thp = ev->thp;
	threadpool_add(thp, do_rw, (void*)&ts[position]);   /* 向线程池中添加任务 */	


	
	/* 线程池中的线程,模拟处理业务 */
void *do_rw(void *arg)

	puts("do_rw is trigger!\\n");
	int i;
	struct s_info *ts = (struct s_info *)arg;
	int fd = ts->fd; 
	int events = ts->events; 
	void *argg = ts->arg;
	
	struct myevent_s *ev = (struct myevent_s *)argg;
	
	char buf[MAXLINE2];
	char str[INET_ADDRSTRLEN];
	/* 可以在创建线程前设置线程创建属性,设为分离态 */
	pthread_detach(pthread_self());
	printf("(rw)thread %d is recieving from PORT %d\\n", (unsigned int)pthread_self(), ntohs((*ts).cliaddr.sin_port));


	while (1) 
		int n = Read(fd, buf, MAXLINE2);
		if (n == 0) 		//因为在socket中的读也是阻塞的
			printf("the other side has been closed.\\n");
			break;
		else if(n == -1)
			continue;
		
		printf("(rw)thread %d  received from %s at PORT %d\\n", (unsigned int)pthread_self(),
				inet_ntop(AF_INET, &(*ts).cliaddr.sin_addr, str, sizeof(str)),
				ntohs((*ts).cliaddr.sin_port));
		for (i = 0; i < n; i++)
			buf[i] = toupper(buf[i]);
		Write(ts->connfd, buf, n);
	
	close(ts->connfd);
	
	//做完任务之后要把事件从树上摘下 
	eventdel(fd, ev);




//threadpool_create(2,4096,4096);  
threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size)

    int i;
    threadpool_t *pool = NULL;          /* 线程池 结构体 */

    do 
        if((pool = (threadpool_t *)malloc(sizeof(threadpool_t))) == NULL)   
            printf("malloc threadpool fail");
            break;                                      /*跳出do while*/
        

        pool->min_thr_num = min_thr_num;
        pool->max_thr_num = max_thr_num;
        pool->busy_thr_num = 0;
        pool->live_thr_num = min_thr_num;               /* 活着的线程数 初值=最小线程数 */
        pool->wait_exit_thr_num = 0;
        pool->queue_size = 0;                           /* 有0个产品 */
        pool->queue_max_size = queue_max_size;          /* 最大任务队列数 */
        pool->queue_front = 0;
        pool->queue_rear = 0;
        pool->shutdown = false;                         /* 不关闭线程池 */

        /* 根据最大线程上限数, 给工作线程数组开辟空间, 并清零 */
        pool->threads = (pthread_t *)malloc(sizeof(pthread_t)*max_thr_num); 
        if (pool->threads == NULL) 
            printf("malloc threads fail");
            break;
        
        memset(pool->threads, 0, sizeof(pthread_t)*max_thr_num);

        /* 给 任务队列 开辟空间 */
        pool->task_queue = (threadpool_task_t *)malloc(sizeof(threadpool_task_t)*queue_max_size);
        if (pool->task_queue == NULL) 
            printf("malloc task_queue fail");
            break;
        

        /* 初始化互斥琐、条件变量 */
        if (pthread_mutex_init(&(pool->lock), NULL) != 0
                || pthread_mutex_init(&(pool->thread_counter), NULL) != 0
                || pthread_cond_init(&(pool->queue_not_empty), NULL) != 0
                || pthread_cond_init(&(pool->queue_not_full), NULL) != 0)
        
            printf("init the lock or cond fail");
            break;
        

        /* 启动 min_thr_num 个 work thread */
        for (i = 0; i < min_thr_num; i++) 
            pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);   /*pool指向当前线程池,作为线程回调函数的参数 void* args*/
            printf("start thread %d...\\n", (unsigned int)pool->threads[i]);
        
        
        /*  创建 1 个管理者线程。 */
        pthread_create(&(pool->adjust_tid), NULL, adjust_thread, (void *)pool);     /* 创建管理者线程 */

        return pool;

     while (0);

    threadpool_free(pool);      /* 前面代码调用失败时,释放poll存储空间 */

    return NULL;


/* 向线程池中 添加一个任务 */
//threadpool_add(thp, process, (void*)&num[i]);   /* 向线程池中添加任务 process: 小写---->大写*/
/* 在这个函数里面把任务写道任务队列中 */ 
int threadpool_add(threadpool_t *pool, void*<

以上是关于用Unix / C实现基于自动扩/减容线程池+epoll反应堆检测沉寂用户模型的服务器框架(含源码)的主要内容,如果未能解决你的问题,请参考以下文章

可扩/减容线程池C语言原理讲解及代码实现

可扩/减容线程池C语言原理讲解及代码实现

基于pthread的线程池实现

基于C++11实现的高效线程池及工作原理

基于C++11实现的高效线程池及工作原理

全网最牛的基于C++11新特性线程池设计与实现linux服务器开发