网络 http服务器-v3-libevent版本

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了网络 http服务器-v3-libevent版本相关的知识,希望对你有一定的参考价值。

一、libevent API简介

        1.申请  事件库(维护多个事件句柄的结构体);

               struct event_base *base = event_base_new();

          2.申请 事件句柄,句柄的初始化,添加句柄到事件库

            //创建并绑定一个event

                struct event *listen_event;

           //参数:event_base, 监听的fd,事件类型及属性,绑定的回调函数,给回调函数的参数

                listen_event = event_new(base, listener, EV_READ|EV_PERSIST, callback_func, (void*)base);

           //参数:event,超时时间(struct timeval *类型的,NULL表示无超时设置)

                event_add(listen_event, NULL);

        注:libevent支持的事件及属性包括(使用bitfield实现,所以用|来让他们合体)

                (a) EV_TIMEOUT:超时

                (b) EV_READ: 只要网络缓冲中还有数据,回调函数就会被触发

                (c) EV_WRITE: 只要塞给网络缓冲的数据被写完,回调函数就会被触发

                (d) EV_SIGNAL: POSIX信号量

                (f) EV_ET: Edge-Trigger边缘触发

          3.事件库执行 监听循环(内置循环)

            void event_base_dispatch(base);

          4.回调函数

             typedef void(* event_callback_fn(evutil_socket_t sockfd, short event_type, void *arg))

            注意:该回调函数 第一个参数 与第二个参数  都不需要用户自己传入(在宏定义中已经自己处理、)因此,我们只需传入参数三 即可

          5.其他重点函数

            event_base_loopexit(struct event_base * , const struct timeval* );//停止事件库监听循环。timeval为NULL表示立即停止,否则等待 给定 时间后停止

             void event_free(struct event* event) //将事件句柄 清理并从 事件库的监视中删除

            event_base_free(struct event_base*) //清理事件库(程序逻辑完成后的工作)

         

二、http服务器编写逻辑

       1.注册 、绑定 监听套接字文件描述符、设为监听状态;

         2.注册 事件库,初始化事件,将 监听套接字 添加到事件库,开始循环

         3.分别 编写 accept 、read 、write 、error回调函数

         4.accept回调函数中创建线程,该线程的线程函数中 重新注册事件库 ,将客户端 socket 添加进事件库(此时关注读事件,回调函数为read),开始循环

         5.当该线程读事件 就绪,用read回调函数 处理http请求,将相关信息保存到自己维护的结构体中。停止 事件库循环,将关注的事件改为 写事件,回调函数改为write。

         6.处理完一次http请求,清理缓冲区和事件库。

三、源代码

    libevent.h

#ifndef __LIBEVENT_H__
#define __LIBEVENT_H__

#include "httpd.h"
#include <event2/event.h>
#include <event2/bufferevent.h>
#include <malloc.h>
#include <netdb.h>
#include <sys/utsname.h>
#include <net/if.h>
#include <sys/time.h>
#include <sys/ioctl.h>
#include <pthread.h>
#include <stdlib.h>

#define _MAX_ACCEPT_ 10

typedef struct fd_event_base{
	evutil_socket_t fd;
	struct event_base * base;
}ev_base;

void libevent_up(int sock);//启动libevent 事件库 循环
void error_cb(evutil_socket_t fd,short event,void * arg);//放生错误的回调函数
void write_cb(evutil_socket_t fd,short event,void * arg);//写事件的回调函数
void read_cb(evutil_socket_t fd,short event,void * arg);//读事件。。。。。
void accept_cb(evutil_socket_t fd,short event, void * arg);//建立连接的回调函数(主线程中)
void* accept_on_thread(void* fd_arg);//子线程处理函数
#endif

libevent.c

#include "libevent.h"

#define _MAXFD_ 10 
#define ERRORIP -1

int timeout_times=0;

int set_non_block(int fd)
{
	int old_flag=fcntl(fd,F_GETFL);
	if(old_flag<0){
		perror("fcntl");
	//	exit(-4);
		return -1;
	}
	if(fcntl(fd,F_SETFL,old_flag|O_NONBLOCK)<0){
		perror("fcntl");
		return -1;
	}

	return 0;

}

int startup(char* ip,int port)
{
	int sock=socket(AF_INET,SOCK_STREAM,0);
	struct sockaddr_in server;
	server.sin_family=AF_INET;
	server.sin_addr.s_addr=inet_addr(ip);
	server.sin_port=htons(port);
	int flag=0;
//	printf("port  %d  %d",port,(htons(port)));
	if(bind(sock,(struct sockaddr *)&server,sizeof(server))<0){
		perror("bind");
		exit(-2);
	}
	if( setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof(flag)) == -1)  
    {  
	    perror("setsockopt");  
	    exit(1);  			    
	}  
	if(listen(sock,50)<0){
		perror("listen");
		exit(-3);
	}
	return sock;
}

void usage(char* arg)
{
	printf("usage %s [ip] [port]\n",arg);

}

void accept_cb(evutil_socket_t fd,short event, void * arg)
{
	pthread_t tid=0;
	if(event==EV_TIMEOUT){
		if((timeout_times++)==10){
			printf("time out too long to exit!\n");
			event_base_loopexit(((ev_base*)arg)->base, NULL);		
			return;
		}

		printf("time out!\n");
		return;
	}	
	evutil_socket_t new_sock=-1;
	struct sockaddr_in client;
	socklen_t len=sizeof(client);
	while((new_sock=accept(((ev_base*)arg)->fd,(struct sockaddr*)&client,&len))){	
		if(new_sock<0){
			//perror("accept");
			break;
	}
		if(set_non_block(new_sock)<0){
			echo_error(new_sock,500);
			continue;
		}
		printf(" accept success :%d\n",new_sock);
		if(pthread_create(&tid,NULL,accept_on_thread,(void*)new_sock)<0){
			perror("pthread_create");
			continue;
		}
		pthread_detach(tid);
	}

}

void* accept_on_thread(void* fd_arg)
{
	//线程内部创建base 来处理不同http请求
	read_buf read_buffer;
	read_buffer._buf.fd=(int)fd_arg;
	read_buffer._base=event_base_new();
	struct event * sock_event=NULL;
	struct timeval timeout;
	timeout.tv_sec=5;
	timeout.tv_usec=0;
	if(read_buffer._base==NULL){
		perror("base_new");
		return NULL;
	}

	sock_event=event_new(read_buffer._base,(evutil_socket_t)fd_arg,EV_READ|EV_PERSIST|EV_ET,read_cb,(void *)&read_buffer);
	if(sock_event==NULL){
		perror("event_new");
		return NULL;
	}
	event_add(sock_event,&timeout);
	event_base_dispatch(read_buffer._base);
//	struct bufferevent* ev_buf;
//	bev=bufferevent_socket_new(read_buffer._base,new_sock,BEV_OPT_CLOSE_ON_FREE );
//	if(ev_buf==NULL){
//		perror("ev_buf");
//		return;
//	}
//	bufferevent_setcb(bev, read_cb, NULL, error_bc, (void*)fd);  
//	bufferevent_enable(bev, EV_READ|EV_PERSIST);
	if(read_buffer._base!=NULL)
	{	
		printf("join_write_event!\n");
		event_free(sock_event);
		sock_event=event_new(read_buffer._base,(evutil_socket_t)fd_arg,EV_WRITE|EV_PERSIST|EV_ET,write_cb,(void *)&read_buffer);
		if(sock_event==NULL){
			perror("event_new");
			return NULL;
		}
		printf("fd:%d\n",read_buffer._buf.fd);
		event_add(sock_event,&timeout);
		printf("join_write_event_loop!\n");
		event_base_dispatch(read_buffer._base);
		
	}
	event_free(sock_event);
	event_base_free(read_buffer._base);
	close((evutil_socket_t)fd_arg);
	printf("client lib out! fd :%d \n",(int)fd_arg);
	return NULL;
}


void read_cb(evutil_socket_t fd,short event,void * arg)
{
	if(event==EV_TIMEOUT){
		printf("time out!\n");
	}
	int ret=event_recv_http((pread_buf)arg);
	if(ret<0){
		event_base_free(((pread_buf)arg)->_base);
		((pread_buf)arg)->_base==NULL;
		return;
	}
	printf("");
	event_base_loopexit(((pread_buf)arg)->_base, NULL);	
	
}
	


void write_cb(evutil_socket_t fd,short event,void * arg)
{
	printf("write_cb\n");
	event_echo_http((pread_buf)arg);
	printf("event_echo_http_success!\n");
	event_base_loopexit(((pread_buf)arg)->_base, NULL);
}

void error_cb(evutil_socket_t fd,short event,void * arg)
{
}
void libevent_up(int sock)
{
	ev_base base;
	
	base.base=event_base_new();
	base.fd=(evutil_socket_t)sock;
	struct event * listen_event=NULL;
	evutil_socket_t listener=sock;
	struct timeval timeout;
	timeout.tv_sec=5;
	timeout.tv_usec=0;
	if(base.base==NULL){
		perror("base_new");
		return;
	}


	listen_event=event_new(base.base,listener,EV_READ|EV_PERSIST|EV_ET,accept_cb,(void *)&base);
	if(listen_event==NULL){
		perror("event_new");
		return;
	}
	event_add(listen_event,&timeout);
	event_base_dispatch(base.base);
	event_free(listen_event);
	event_base_free(base.base);

	printf("server out!\n ");
}


int main(int argc,char* argv[]){
	if(argc!=3){
		usage(argv[0]);
		exit(-1);
	}	
	int port=atoi(argv[2]);
	int listen_sock;
	char* ip=NULL;
	if(strcmp(argv[1],"any")==0){
			int sfd, intr;
			struct ifreq buf[16];
		    struct ifconf ifc;
		    sfd = socket (AF_INET, SOCK_DGRAM, 0); 
		    if (sfd < 0)
				return ERRORIP;
			ifc.ifc_len = sizeof(buf);
			ifc.ifc_buf = (caddr_t)buf;
			if (ioctl(sfd, SIOCGIFCONF, (char *)&ifc))
				return ERRORIP;
			intr = ifc.ifc_len / sizeof(struct ifreq);
			while (intr-- > 0 && ioctl(sfd, SIOCGIFADDR, (char *)&buf[intr]));
				close(sfd);
			ip= inet_ntoa(((struct sockaddr_in*)(&buf[intr].ifr_addr))-> sin_addr);
		
		printf("%s\n",ip);	
		listen_sock=startup(ip,port);
	}
	else

		 listen_sock=startup( argv[1],port);

//	printf("port %s %d",argv[2],port);
	
    set_non_block(listen_sock);
	libevent_up(listen_sock);
	close(listen_sock);
	return 0;
}

http.h

#ifndef __MYHTTP__

#define __MYHTTP__
#include <stdio.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/epoll.h>
#include <sys/sendfile.h>
#define _SIZE_ 1024

typedef struct event_buf{//http请求信息的缓冲区
	int fd;
	char method[10];
	char url[1024];
	char parameter[1024];//get方法 参数
	int cgi;
}ev_buf,*pev_buf;

typedef struct ev_read_buf{//libevent 的缓冲区
	struct event_base * _base;
	ev_buf _buf;	
}read_buf,*pread_buf;

char* get_text(int fd,char* buf);//获取正文

int event_recv_http(pread_buf);//读取http请求
int event_echo_http(pread_buf);//回复客户端
void cgi_action(int fd,char* method,char* url,char* parameter);//cgi处理逻辑

int get_line(int sock_fd,char * buf);//获取socket_fd一行

void echo_error(int fd,int _errno);//错误回显
void error_all(int fd,int err,char* reason);//所有错误的处理
void echo_html(int fd,const char* url,int size );//回显网页

#endif

http.c

#include "httpd.h"

#define DEFAULT "/default.html"
#define IMG "src_html"
#define CGI "src_cgi"

int times=0;

void echo_error(int fd,int _errno)
{
	printf("join err\n");
	switch(_errno){
	case 400://Bad Request  //客户端请求有语法错误,不能被服务器所理解
		break;
	case 404:////请求资源不存在,eg:输入了错误的URL
		//printf("*******404\n");
		error_all(fd,404,"NOT_FIND!!");
		break;
	case 401://请求未经授权,这个状态代码必须和WWW-Authenticate报头域一起使用 
		break;
	case 403://服务器收到请求,但是拒绝提供服务 
		break;
	case 500:// Internal Server Error //服务器发生不可预期的错误
		break;
	case 503://Server Unavailable  //服务器当前不能处理客户端的请求,一段时间后可能恢复正常
		break;
	default:
		break;
	}
}

void error_all(int fd,int err,char* reason)
{	
	char buf[_SIZE_]="";
	char error[_SIZE_]="";
	sprintf(buf,"HTTP/1.0 %d %s\r\n\r\n",err,reason);
	sprintf(error," %d %s",err,reason);
	printf("err buf:%s\n error:%s",buf,error);
	write(fd,buf,strlen(buf));
	write(fd,"<html>\n",strlen("<html>\n"));
	write(fd,"<head>",strlen("<head>"));
	write(fd,"<h1> HELLO PPH!!!</h1>\n",strlen("<h1> HELLO PPH!!!</h1>\n"));
	write(fd,"<h2>",strlen("<h2>"));
	write(fd,error,strlen(error));
	write(fd,"</h2>\n",strlen("</h2>\n"));
	write(fd,"</head>\n",strlen("</head>\n"));
	write(fd,"</html>",strlen("</html>"));
//	echo_html(fd,"src_html/1.html",102400000);
}
void echo_html(int fd,const char* url,int fd_size ){	
	char buf[_SIZE_]="HTTP/1.1 200 OK\r\n\r\n";
	
	int rfd=-1;
	off_t set=0;
	ssize_t size=1;
	if((rfd=open(url, O_RDONLY))<0){
		echo_error(fd,500);
		printf("e-html %s",strerror(errno));
	}
	if(write(fd,buf,strlen(buf))<0){
		perror(" echo_html_write");
		close(rfd);
		return;
	}
	int i=0;
	while(set!=fd_size){
		size=sendfile(fd,rfd,&set,fd_size);
		if(errno!=EAGAIN&&size<0){
			printf("sendfile error %s %d\n",strerror(errno),errno);
			break;
		}
	//	printf("\nsend: size %d all size  %d time %d \n",set,fd_size,++i);
	}
	printf("echo html success!\n");
	close(rfd);
	return;	
}

int get_line(int sock_fd,char * line){
	int index=0;
	ssize_t size=1;
	char ch=0;
	printf("getline start\n");	
	while(ch!=‘\n‘){
		if((size=read(sock_fd,&ch,1))<0){
			perror("getline__");
			//if(errno==EAGAIN)
			//	continue;
			//else
				return -2;
		}
		if(ch==‘\r‘){
			char tmp=0;
			if(recv(sock_fd,&tmp,1,MSG_PEEK)>0){
				if(tmp==‘\n‘){
					line[index++]=tmp;
					read(sock_fd,&ch,1);
					continue;
				}
			}
		}
		//printf("index %d\n",index);
		if(index==1024){
			printf("httpd line full exit\n");
			line[1023]=0;
			return -2;
		}
				line[index++]=ch;
	}
	line[index]=0;
	if(strcmp(line,"\n")==0){
		return 0;
	}
	printf("getline  success\n");
	return 1;
}


//获取post正文 参数

char* get_length(int fd,char* content_length)
{
	int size=1;
	int tag=0;
	int index=0;
	while(size!=0){//通过持续读取一行 直到读到空行结束
		size=get_line(fd,content_length);
		if(size==-2)
			continue;
		if(strncasecmp(content_length,"content-length: ",16)==0){
			printf(" length success\n");
			break;
		}
		if(size==-1){
			printf("get line出错\n");
			return NULL;
			}
			
	}
	content_length[strlen(content_length)-1]=0;
	strcpy(content_length,content_length+16);
	printf("con end: %s\n",content_length);
	return content_length;
}


void cgi_action(int fd,char* method,char* url ,char* parameter)
{
	char env[20]="METHOD=";
	char par[_SIZE_]="PARAMETER=";
	int pwrite[2];
	if((pipe(pwrite)<0)){
		perror("pipe");
		return;
	}
	strcat(env,method);
	strcat(par,parameter);
	printf(" act url:%s\n",url);
	printf("parameter:%s\n",par);	
	if(putenv(env)<0){
		perror("putenv");
		return;
	}
	if(putenv(par)<0){
		perror("putenv par");
		return;
	}
//	printf("fork qian\n");
	pid_t id=fork();
	if(id<0){
		perror("fork");
		return;
	}
	else if(id==0){//子进程
		close(pwrite[0]);
		//printf("child\n");
		if(dup2(pwrite[1],1)<0){
			perror("dup2.1");
			return;
		}
		if(dup2(fd,0)<0){
			perror("dup2.2");
			return;
		}
		if(execl(url,NULL)<0){
			perror("execl");
			printf("exit url:\n",url);
			exit(-2);
		}
		 
	}
	else{//父进程
		close(pwrite[1]);
		char buf[_SIZE_]="";
		int count=0;
		int i=0;
		ssize_t size=1;
		while(size>0){
			size=read(pwrite[0],buf,_SIZE_);
			if(size<0){
				echo_error(fd,500);
				break;
			}
			if(size==0)
				break;
			write(fd,buf,strlen(buf));
		
		}
		waitpid(-1,NULL,0);
		close(pwrite[0]);
	}	
}

int event_echo_http(pread_buf ev)
{
	
	char* method=ev->_buf.method;
	char* url=ev->_buf.url;
	char* parameter=ev->_buf.parameter;//get方法 参数
	int fd=ev->_buf.fd;

	int cgi=ev->_buf.cgi;
	struct stat stat_buf; 
	if(cgi==0)	
		if(stat(url, &stat_buf)<0){
			printf("stat <0 \n");
			echo_error(fd,404);
			return 0;
		}

	if(strcasecmp("POST",method)==0){
		
		//printf("already cgi\n");
		cgi_action(fd,method,url,parameter);
	}
	else if(strcasecmp("GET",method)==0){
			if(cgi==1){
			//cgi
		//	printf("rev_http: parameter:%s\n",parameter);
			cgi_action(fd,method,url,parameter);
			printf("ret cgi\n");
		}
		else{
			echo_html(fd,url,stat_buf.st_size);
		}
	
	}
	if(strcasecmp(method,"POST")==0)
		clear_buf(fd);
	return 0;
}

int event_recv_http(pread_buf ev)
{
	if(ev==NULL){
		printf("ev error\n");
		return -1;
	}
	int fd=ev->_buf.fd;
	
	char real_url[128]="src_html";
	char line[_SIZE_];
	char* method=NULL;
	char* version=NULL;
	char* url=NULL;
	char parameter[_SIZE_]="";//get方法 参数
	char  content_length[_SIZE_]="";
	if(get_line(fd,line)==-2){
		printf("it‘s a cache request! so can‘t process!\n");
		return 0;
	}
	int index=strlen(line)-1;
	//GET / HTTP/1.1
	while(index>0){//提取method url
		if(line[index]==‘ ‘&&version==NULL){
			version=((char*)line)+index+1;
			line[index]=0;
		}
		if(line[index]==‘ ‘&&url==NULL){
			url=line+index+1;
			line[index]=0;
		}
		--index;
	}
	method=line;

	ev->_buf.cgi=0;
	if(strcasecmp("GET",method)==0){
		index=0;
		while(url[index]){
			if(url[index]==‘?‘){
				ev->_buf.cgi=1;
				strcpy(parameter,url+index+1);
				url[index]=0;
				ev->_buf.cgi=1;
				break;
			}
			++index;
		}
		
	}
	else if(strcasecmp("POST",method)==0){
		ev->_buf.cgi=1;
		if(get_length(fd,content_length)==NULL){
			echo_error(fd,503);
			printf("get len err\n");
			clear_buf(fd);
			return -1;
		}
		strcpy(parameter,content_length);
	}
	if(strcmp(url,"/")==0){
		strcat(real_url,DEFAULT);
	}
	else{
		if(ev->_buf.cgi==1){
			strcpy(real_url,CGI);
		}
			strcat(real_url,url);
	}
	printf("real_url :%s\n",real_url);
	
	strcpy(ev->_buf.method,method);
	strcpy(ev->_buf.url,real_url);
	strcpy(ev->_buf.parameter,parameter);
	printf(" get connect :%d times !\n",++times);
	if(strcasecmp(method,"get")==0)
		clear_buf(fd);

	return 1;
}

int  clear_buf(int fd)
{
	char buf[_SIZE_]="";
	ssize_t size=0;
	size=read(fd,buf,_SIZE_);
	if(size<0){
		perror("clear_buf_read");
		return -1;
	}
	buf[size]=0;
	return 0;
}


以上是关于网络 http服务器-v3-libevent版本的主要内容,如果未能解决你的问题,请参考以下文章

网络 http服务器-v1-多线程版本

Nginx反代配置

back propogation 的线代描述

网络 http服务器-v2-epoll版本

Nginx反代配置

http代理是啥类型