Linux中epoll+线程池实现高并发
Posted ctz
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Linux中epoll+线程池实现高并发相关的知识,希望对你有一定的参考价值。
服务器并发模型通常可分为单线程和多线程模型,这里的线程通常是指“I/O线程”,即负责I/O操作,协调分配任务的“管理线程”,而实际的请求和任务通常交由所谓“工作者线程”处理。通常多线程模型下,每个线程既是I/O线程又是工作者线程。所以这里讨论的是,单I/O线程+多工作者线程的模型,这也是最常用的一种服务器并发模型。我所在的项目中的server代码中,这种模型随处可见。它还有个名字,叫“半同步/半异步“模型,同时,这种模型也是生产者/消费者(尤其是多消费者)模型的一种表现。
这种架构主要是基于I/O多路复用的思想(主要是epoll,select/poll已过时),通过单线程I/O多路复用,可以达到高效并发,同时避免了多线程I/O来回切换的各种开销,思路清晰,易于管理,而基于线程池的多工作者线程,又可以充分发挥和利用多线程的优势,利用线程池,进一步提高资源复用性和避免产生过多线程。
线程池你开10个线程当然可以一上来全部accept阻塞住,这样客户端一连上来便会自动激活一个线程去处理,但是设想一下,如果10个线程全部用掉了,第11个客户端就会发生丢弃。这样为了实现”高并发“你得不断加大线程池的数量。这样会带来严重的内存占用和线程切换的时延问题。
于是前置事件轮询设施的方案就应运而生了,
主线程轮询负责IO,作业交给线程池。
在高并发下,10W个客户端上来,就主线程负责accept,放到队列中,不至于发生没有及时握手而丢弃掉连接的情况发生,而作业线程从队列中认领作业,做完回复主线程,主线程负责write。这样可以用极少的系统资源处理大数量连接。
在低并发下,比如2个客户端上来,也不会出现100个线程hold住在那从而发生系统资源浪费的情况。
正确实现基本线程池模型的核心:
主线程负责所有的 I/O 操作,收齐一个请求所有数据之后如果有必要,交给工作线程进行处理 。处理完成之后,把需要写回的数据还给主线程去做写回 / 尝试写回数据直到阻塞,然后交回主线程继续。
这里「如果有必要」的意思是:经过测量,确认这个处理过程中所消耗的 CPU 时间(不包括任何 I/O 等待,或者相关的 I/O 等待操作无法用 epoll 接管)相当显著。如果这个处理过程(不包含可接管的 I/O 操作)不显著,则可以直接放在主线程里解决。
这个「必要」与否的前提不过三个词:假设,分析,测量。
实现单I/O线程的epoll模型是本架构的第一个技术要点,主要思想如下:
单线程创建epoll并等待,有I/O请求(socket)到达时,将其加入epoll并从线程池中取一个空闲工作者线程,将实际的业务交由工作者线程处理。
以上摘自:https://www.cnblogs.com/cthon/p/9139384.html
创建一个epoll实例; while(server running) { epoll等待事件; if(新连接到达且是有效连接) { accept此连接; 将此连接设置为non-blocking; 为此连接设置event(EPOLLIN | EPOLLET ...); 将此连接加入epoll监听队列; 从线程池取一个空闲工作者线程并处理此连接; } else if(读请求) { 从线程池取一个空闲工作者线程并处理读请求; } else if(写请求) { 从线程池取一个空闲工作者线程并处理写请求; } else 其他事件; }
刚学线程池,若有误请大家指出(可联系我,下有邮箱):
服务器代码:
lock.h
/************************************************************************* > File Name: lock.h > Author: gushi > Mail: 971859774@qq.com > Created Time: 2018年11月22日 星期四 20时06分55秒 ************************************************************************/ #ifndef LOCK_H #define LOCK_H #include <iostream> #include <pthread.h> #include <semaphore.h> using namespace std; class Sem { private: sem_t sem; public: Sem(); ~Sem(); bool wait(); bool post(); }; Sem::Sem() { if(sem_init(&sem,0,0)!=0)//信号量的初始值和和基于内存的信号量 cerr<<"sem init error."<<endl; } Sem::~Sem() { sem_destroy(&sem); } bool Sem::wait() { return sem_wait(&sem)==0?true:false; } bool Sem::post() { return sem_post(&sem)==0?true:false; } //互斥类 class Mutex { private: pthread_mutex_t mutex; public: Mutex(); ~Mutex(); bool mutex_lock(); bool mutex_unlock(); }; Mutex::Mutex() { if(pthread_mutex_init(&mutex,NULL)!=0)//可用PTHRAD_MUTEX_INITIALIZER宏初始化 cerr<<"mutex init error"<<endl; } Mutex::~Mutex() { pthread_mutex_destroy(&mutex); } bool Mutex::mutex_lock() { return pthread_mutex_lock(&mutex)==0?true:false; } bool Mutex::mutex_unlock() { return pthread_mutex_unlock(&mutex)==0?true:false; } //条件变量的类 class Cond { private: pthread_mutex_t mutex; pthread_cond_t cond; public: Cond(); ~Cond(); bool wait(); bool signal(); bool broadcast(); }; Cond::Cond() { if(pthread_mutex_init(&mutex,NULL)!=0) { cerr<<"Cond mutex init error"<<endl; exit(0); } if(pthread_cond_init(&cond,NULL)!=0) { cerr<<"Cond cond init error"<<endl; pthread_mutex_destroy(&mutex); exit(0); } } Cond::~Cond() { pthread_mutex_destroy(&mutex); pthread_cond_destroy(&cond); } bool Cond::wait() { int rs=0; pthread_mutex_lock(&mutex); rs=pthread_cond_wait(&cond,&mutex); pthread_mutex_unlock(&mutex); return rs==0?true:false; } bool Cond::signal() { return pthread_cond_signal(&cond)==0?true:false; } bool Cond::broadcast() { return pthread_cond_broadcast(&cond); } #endif
threadpool.h,还没有实现动态增加功能,以后待更新...
/************************************************************************* > File Name: threadpool.h > Author: gushi > Mail: 971859774@qq.com > Created Time: 2018年11月22日 星期四 20时32分12秒 ************************************************************************/ #ifndef THREADPOOL_H #define THREADPOOL_H #include <queue> #include <vector> #include <exception> #include <errno.h> #include "lock.h" #define MAX_THREADS 1024 using namespace std; template <class T> class Threadpool { private: int idle;//线程池中空闲线程的数量 int num;//线程池中线程数 vector<pthread_t> idle_tid;//空闲线程的集合 vector<pthread_t> busy_tid;//正在执行任务的线程的集合 queue<T *> task_queue;//任务队列 Mutex mutex;//互斥锁 Cond cond;//条件变量锁 bool is_stop;//是否结束线程 public: static void *worker(void *arg);//线程函数,里面执行run函数 void run(); T *get_task();//获取任务函数 int mv_to_idle(pthread_t tid);//执行任务完成后,放入空闲 int mv_to_busy(pthread_t tid);//移入到忙碌线程中 public: Threadpool(int n=20); ~Threadpool(); bool append_task(T *task);//添加任务函数 void start();//开始创建线程池 void stop();//线程停止函数 }; template <class T> Threadpool<T>::Threadpool(int n):num(n),idle(n),is_stop(false) { if(num<=0) { cerr<<"threadpool can\'t init because num<=0."<<endl; exit(1); } } template <class T> Threadpool<T>::~Threadpool() { stop(); } template <class T> bool Threadpool<T>::append_task(T *task) { mutex.mutex_lock();//临界资源上锁 bool is_signal=task_queue.empty(); task_queue.push(task); mutex.mutex_unlock();//解锁 if(is_signal)//signal to null queue cond.signal(); return true; } template <class T> void Threadpool<T>::start() { for(int i=0;i<num;++i) { pthread_t tid=0; if(pthread_create(&tid,NULL,worker,this)!=0)//this参数的传递对于开启线程运行函数要用到, { throw exception(); exit(1); } idle_tid.push_back(tid);//加入到空闲线程集合 } } template <class T> void Threadpool<T>::stop() { is_stop=true; cond.broadcast(); } template <class T> void *Threadpool<T>::worker(void *arg) { Threadpool<T> *thread=(Threadpool<T> *)arg;//thread为一个线程池的指针,指向整个线程池, thread->run();//调用线程运行函数,真正执行工作的函数 return thread; } template <class T> void Threadpool<T>::run() { pthread_t tid=pthread_self();//mutex.mutex_lock();这会造成与append_task函数构成死锁,访问task_queue与idle_tid与busy_tid等临界资源时,可以再各自的函数实现中加锁 while(1)//if { T *task=get_task();//函数实现中有互斥锁临界访问 if(task==NULL) { cerr<<"task_queue is null.wait()"<<endl; cond.wait(); } else { mv_to_busy(tid);//函数实现中有互斥锁保护访问 task->doit();//工作函数 mv_to_idle(tid); } } //mutex.mutex_unlock(); } template <class T> T *Threadpool<T>::get_task() { T *task=NULL; mutex.mutex_lock(); if(!task_queue.empty()) { task=task_queue.front(); task_queue.pop(); } mutex.mutex_unlock(); return task; } template <class T> int Threadpool<T>::mv_to_idle(pthread_t tid) { vector<pthread_t>::iterator busy_iter=busy_tid.begin(); while(busy_iter!=busy_tid.end()) { if(tid==*busy_iter) break; ++busy_iter; } mutex.mutex_lock(); busy_tid.erase(busy_iter);//此线程空闲,从繁忙任务队列中移除 idle_tid.push_back(tid);//添加到空闲线程集合中 //mutex.mutex_lock(); ++idle; mutex.mutex_unlock(); return 0; } template <class T> int Threadpool<T>::mv_to_busy(pthread_t tid) { vector<pthread_t>::iterator idle_iter=idle_tid.begin(); while(idle_iter!=idle_tid.end()) { if(tid==*idle_iter) break; ++idle_iter; } mutex.mutex_lock(); idle_tid.erase(idle_iter); busy_tid.push_back(tid); //mutex.mutex_lock(); --idle; mutex.mutex_unlock(); } #endif
epollserver.h
/************************************************************************* > File Name: epollserver.h > Author: gushi > Mail: 971859774@qq.com > Created Time: 2018年11月23日 星期五 17时07分25秒 ************************************************************************/ #ifndef EPOLL_SERVER_H #define EPOLL_SERVER_H #include <sys/socket.h> #include <sys/types.h> #include <sys/wait.h> #include <sys/epoll.h> #include <netinet/in.h> #include <arpa/inet.h> #include <string.h> #include <fcntl.h> #include <unistd.h>//write头文件 #include <errno.h> #include "threadpool.h" #define MAX_EVENT 1024 #define MAX_BUFFER 2048 using namespace std; class Basetask { public: virtual void doit()=0; }; class Task:public Basetask { private: int sockfd; char msg[MAX_BUFFER]; public: Task(int ,char *); void doit(); }; Task::Task(int fd,char *str):sockfd(fd) { memset(msg,0,sizeof(msg)); strcpy(msg,str); } void Task::doit() { cout<<"server reveive message is: "<<msg<<endl; write(sockfd,msg,strlen(msg)); //Threadpool<Task>::mv_to_idle(tid); } class Epollserver { private: bool is_stop;//是否停止epoll_wait int num;//线程数目 int sockfd; int port; int epollfd; Threadpool<Task> *pool;//线程池的指针 epoll_event events[MAX_EVENT]; struct sockaddr_in servaddr; public: Epollserver(int p,int n); Epollserver(){} ~Epollserver(); void init(); void epoll(); static int setnonblocking(int fd); static void addfd(int epollfd,int sockfd,bool onshot); }; Epollserver::Epollserver(int p,int n):port(p),num(n),is_stop(false), pool(NULL) { } Epollserver::~Epollserver() { delete pool; } void Epollserver::init() { bzero(&servaddr,sizeof(servaddr)); servaddr.sin_family=AF_INET; servaddr.sin_addr.s_addr=htonl(INADDR_ANY); servaddr.sin_port=htons(port); //监听套接字 sockfd=socket(AF_INET,SOCK_STREAM,0); if(sockfd<0) { cerr<<"Epollserver socket init error"<<endl; exit(1); } int tmp=bind(sockfd,(struct sockaddr *)&servaddr,sizeof(servaddr)); if(tmp<0) { cerr<<"Epollserver bind init error"<<endl; exit(1); } tmp=listen(sockfd,10); if(tmp<0) { //cout<<errno<<endl; cerr<<"Epollserver listen init error:"<<strerror(errno)<<endl; exit(1); } epollfd=epoll_create(1024); if(epollfd<0) { cerr<<"Epollserver epoll_create init error"<<endl; exit(1); } //创建线程池,num是线程池中线程的个数,调用构造函数 pool=new Threadpool<Task>(num); } void Epollserver::epoll() { pool->start();//启动线程池 addfd(epollfd,sockfd,false); while(!is_stop) { int ret=epoll_wait(epollfd,events,MAX_EVENT,-1); if(ret<0) { cerr<<"epoll_wait error"<<endl; exit(1); } for(int i=0;i<ret;++i) { //int fd=events[i].data.fd; if(events[i].data.fd==sockfd) { struct sockaddr_in cliaddr; socklen_t len=sizeof(cliaddr); //accpet 返回已连接套接字 int confd=accept(sockfd,(struct sockaddr *)&cliaddr,&len); Epollserver::addfd(epollfd,confd,false); } else if(events[i].events&EPOLLIN)//有数据可读 { char buffer[MAX_BUFFER]; int fd=events[i].data.fd;//接受已连接套接字,对客户端进行内容回送 readagain: memset(buffer,0,sizeof(buffer)); ret=read(fd,buffer,MAX_BUFFER-1); if(ret==0)//某个fd关闭连接 { struct epoll_event ev; ev.events=EPOLLIN; ev.data.fd=fd; epoll_ctl(epollfd,EPOLL_CTL_DEL,fd,&ev); shutdown(fd,SHUT_RDWR); cout<<fd<<" exit"<<endl; continue; } else if(ret<0)//读取失败 { if(errno==EAGAIN) { cout<<"read error. read again"<<endl; goto readagain; break; } } else//读取成功 { Task *task=new Task(fd,buffer); pool->append_task(task); } }//else if else cerr<<"something else had happend"<<endl; }//for }//while close(sockfd); pool->stop(); } int Epollserver::setnonblocking(int fd) { int old_opt=fcntl(fd,F_GETFL); int new_opt=old_opt|O_NONBLOCK; fcntl(fd,F_SETFL,new_opt); return old_opt; } void Epollserver::addfd(int epollfd,int sockfd,bool oneshot) { epoll_event event; event.data.fd=sockfd; event.events=EPOLLIN|EPOLLET; if(oneshot) event.events|=EPOLLONESHOT; epoll_ctl(epollfd,EPOLL_CTL_ADD,sockfd,&event); Epollserver::setnonblocking(sockfd); } #endif
server.cpp服务器的主函数
/************************************************************************* > File Name: server.cpp > Author: gushi > Mail: 971859774@qq.com > Created Time: 2018年11月23日 星期五 19时34分29秒 ************************************************************************/ #include "epollserver.h" #define INDARRY_PORT 9877 using namespace std; int main(int argc,char **argv) { Epollserver *epoll=new Epollserver(INDARRY_PORT,20); epoll->init();//对初始化服务器(socket,bind,listen,epoll_create...等函数,病完成线程池的初始化) epoll->epoll();//开启线程池,完成相应的任务添加之后,自动调用线程池中空闲的函数来完成doit工作 return 0; }
客户端程序:
/************************************************************************* > File Name: client.cpp > Author:gushi > Mail: 971859774@qq.com > Created Time: 2018年11月24日 星期六 15时36分23秒 ************************************************************************/ #include <iostream> #include <sys/socket.h> #include <sys/types.h> #include <arpa/inet.h> #include <unistd.h> #include <netinet/in.h> #include <strings.h> #define SERV_PORT 9877 #define MAXLINE 1204 using namespace std; void str_cli(FILE *fp,int sockfd) { fd_set set; FD_ZERO(&set); char buff[1024]; int stdineof=0,n; while(1) { if(stdineof==0) FD_SET(fileno(fp),&set); FD_SET(sockfd,&set); int maxfd=max(fileno(fp),sockfd)+1; select(maxfd,&set,NULL,NULL,NULL); if(FD_ISSET(sockfd,&set)) { if((n=read(sockfd,buff,MAXLINE))==0) if(stdineof==1) return; else cerr<<"str_cli: server terinated peraturely"<<endl; write(fileno(stdout),buff,n); } else if(FD_ISSET(fileno(fp),&set)) { if((n=read(fileno(fp),buff,MAXLINE))==0)//客户完成输入 stdineof=1; write(sockfd,buff,n); //shutdown(sockfd,SHUT_WR); FD_CLR(fileno(fp),&set); continue; } //write(sockfd,buff,n); } return; } int main(int argc,char **argv) { if(argc!=2) { cerr<<"please input server address."<<endl; exit(1); } int sockfd=socket(AF_INET,SOCK_STREAM,0); struct sockaddr_in servaddr; bzero(&servaddr,sizeof(servaddr)); servaddr.sin_family=AF_INET; inet_pton(AF_INET,argv[1],&servaddr.sin_addr); servaddr.sin_port=htons(SERV_PORT); connect(sockfd,(struct sockaddr *)&servaddr,sizeof(servaddr)); str_cli(stdin,sockfd); return 0; }
以上是关于Linux中epoll+线程池实现高并发的主要内容,如果未能解决你的问题,请参考以下文章