Linux学习_多路I/O复用

Posted Leslie X徐

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Linux学习_多路I/O复用相关的知识,希望对你有一定的参考价值。

网络高级编程 多路I/O复用

概念

  • 如recv,send,read和write等函数都是阻塞性函数,若资源没有准备好则调用该函数的进程将进入阻塞状态。
  • 我们可以使用I/O多路复用解决,有两种方式:
    • fcntl函数实现(非阻塞方式)
    • select函数实现
  • 守护进程

fcntl非阻塞实现

在这里插入图片描述

动态数组函数:

/*
 * vector_fd.c
 * 
 */

#include "vector_fd.h"
#include <malloc.h>
#include <assert.h>
#include <stdlib.h>
#include <memory.h>


static void encapacity(VectorFD* vfd)
{
	if(vfd->counter >= vfd->max_counter){
		int *fds = (int*)calloc(vfd->counter+5,sizeof(int));
		assert(fds != NULL);
		memcpy(fds, vfd->fd, sizeof(int)*vfd->counter);
		free(vfd->fd);
		vfd->fd = fds;
		vfd->max_counter += 5;
	}
}

static int indexof(VectorFD *vfd, int fd)
{
	int i=0;
	for(;i < vfd->counter; i++){
		if(vfd->fd[i]==fd) return i;
	}
	return -1;
}

/*创建动态数组*/
VectorFD* create_vector_fd(void)
{
	VectorFD *vfd = (VectorFD*)calloc(1,sizeof(VectorFD));
	assert(vfd != NULL);
	vfd->fd = (int*)calloc(5,sizeof(int));
	assert(vfd->fd != NULL);
	vfd->counter = 0;
	vfd->max_counter = 0;
	return vfd;
}

/*销毁动态数组*/
void destroy_vector_fd(VectorFD *vfd)
{
	assert(vfd != NULL);
	free(vfd->fd);
	free(vfd);
	
}

/*从动态数组中根据下标得到套接字描述符*/
int get_fd(VectorFD* vfd, int index)
{
	assert(vfd != NULL);
	if(index<0 || index > vfd->counter-1)return 0;
	return vfd->fd[index];
}
	
/*删除某一个套接字描述符*/
void remove_fd(VectorFD* vfd, int fd)
{
	assert(vfd != NULL);
	int index = indexof(vfd, fd);
	if(index == -1) return;
	int i = index;
	for(;i < vfd->counter-1; i++){
		vfd->fd[i] = vfd->fd[i+1];
	}
	vfd->counter--;
}

/*增加一个套接字描述符*/
void add_fd(VectorFD* vfd, int fd)
{
	assert(vfd != NULL);
	encapacity(vfd);
	vfd->fd[vfd->counter++] = fd;
	
}



程序实现:

/*
 * TCP服务器.c
 * 
 * 
 */


#include <netdb.h>
#include <sys/socket.h>
#include <unistd.h>
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <memory.h>
#include <signal.h>
#include <time.h>
#include <arpa/inet.h>
#include "vector_fd.h"
#include <errno.h>
#include <pthread.h>
#include <fcntl.h>

VectorFD* vfd;
int sockfd;

//登记信号处理函数,使程序接收ctrl+C信号时关闭服务器
void sig_handler(int signo)
{
	if(signo==SIGINT)
	{
		printf("server close\\n");
		/*步骤6:关闭客户端的socket*/
		close(sockfd);
		//销毁动态数组
		destroy_vector_fd(vfd);
		exit(1);
	}

}

/*输出连接上来的客户端相关信息*/
void out_addr(struct sockaddr_in *clientaddr)
{
	//将端口号从网络字节序转换成主机字节序
	int port = ntohs(clientaddr->sin_port);
	char ip[16];
	memset(ip,0,sizeof(ip));
	//将ip地址从网络字节序转换成点分十进制放到ip字符数组中
	inet_ntop(AF_INET,
					&clientaddr->sin_addr.s_addr,
					ip, sizeof(ip));
	printf("client: %s(%d) connected\\n",ip,port);
}

/* fd对应于某个连接的客户端
 * 功能:和某一个连接的客户端进行双向通信
 * (非阻塞方式)
 */
void do_service(int fd)
{
	/*和客户端进行读写操作(双向通信)*/
	char buff[512];
	memset(buff,0,sizeof(buff));
	/*因为采用非阻塞方式,若读不到数据直接返回
	 * 直接服务于下一个客户端
	 * 因此不需要判断size小于0的情况。
	 */
	size_t size = read(fd,buff,sizeof(buff));
	if(size == 0){//客户端已关闭连接
		char info[]="client closed";
		write(STDOUT_FILENO, info, sizeof(info));//使用不带缓存的内核函数
		//从动态数组中删除对应的fd
		remove_fd(vfd, fd);
		//关闭对应客户端的socket
		close(fd);
	}else if(size>0){
		write(STDOUT_FILENO, buff, sizeof(buff));
		if(write(fd, buff, size) < 0){ //客户端关闭,读取出错,返回值为-1
			if(errno==EPIPE){//客户端关闭连接
				perror("write error");
				remove_fd(vfd, fd);
				close(fd);
			} 
		}
	}
}

//线程遍历动态数组中的socket描述符
void* th_fn(void* arg)
{
	int i = 0;
	while(1){
		i=0;
		//遍历动态数组中的socket描述符
		for(; i < vfd->counter; i++){
			do_service(get_fd(vfd, i));
		}
	}
	return NULL;
}

/*主函数*/
int main(int argc, char **argv)
{
	
	if(argc<2)
	{
		printf("usage:%s #port\\n",argv[0]); //传递端口参数
		exit(1);
	}
	/*处理ctl+c信号,关闭服务器*/
	if(signal(SIGINT, sig_handler)==SIG_ERR) 
	{
		perror("signal sigint error");
		exit(1);
	}
	
	/*步骤1:创建socket
	 * 注:socket创建在内核中,是一个结构体。
	 * AF_INET:IPv4
	 * SOCK_STREAM:tcp协议
	 * */
	sockfd = socket(AF_INET, SOCK_STREAM, 0);
	
	/*步骤2:将socket和地址(包括ip、port)进行绑定*/
	struct sockaddr_in serveraddr;
	memset(&serveraddr,0,sizeof(serveraddr));
	//往地址中填入ip、port、internet地址族类型
	serveraddr.sin_family = AF_INET; //IPv4
	serveraddr.sin_port = htons(atoi(argv[1])); //port
	serveraddr.sin_addr.s_addr = INADDR_ANY; //INADDR_ANY响应所有客户端请求
	if(bind(sockfd, (struct sockaddr*)&serveraddr,
				sizeof(serveraddr))<0)
		{perror("bind error");exit(1);}
		
	/*步骤3:调用listen启动监听(指定端口监听)
	 * 通知系统去接受来自客户端的连接请求
	 * (将接收到的客户端连接请求放置到对应的队列中)
	 * 第二个参数:指定队列长度为10
	 * */
	if(listen(sockfd, 10)<0)
	{perror("listen error");exit(1);}
	
	/*步骤4:调用accept函数,
	 * 从队列中获得一个客户端的请求连接,
	 * 并返回新的socket描述符。
	 * 若没有客户端连接,调用此函数会阻塞,直到获得一个客户端连接。
	 * */
	
	//创建放置套接字描述符fd的动态数组
	vfd = create_vector_fd();
	
	pthread_t th;
	int err;
	err = pthread_create(&th,0,th_fn,(void*)0);
	if(err){perror("create error");exit(1);}
	//以分离状态启动子线程
	pthread_detach(th);
	
	
	/* 1)主控线程获得客户端的连接,将新的socket描述符
	 *    放置到动态数组中
	 * 2)启动的子线程负责遍历动态数组中的socket描述符
	 *    并和对应的客户端进行双向通信
	 *    (采用非阻塞方式读写)
	 */
	 struct sockaddr_in clientaddr;
	socklen_t len = sizeof(clientaddr);
	
	while(1){
		int fd = accept(sockfd,
								(struct sockaddr*)&clientaddr,
								&len);
		if(fd<0){
			perror("accept error");
			continue;
		}
		out_addr(&clientaddr);
		
		//将读写修改为非阻塞方式
		int val;
		fcntl(fd, F_GETFL, &val);
		val |= O_NONBLOCK;
		fcntl(fd, F_SETFL, val);
		
		//将返回的新的socket描述符fd加入到动态数组中
		add_fd(vfd, fd);
	}
	
	return 0;
}


select函数实现

select函数定义:

#include <sys/types.h>
#include <sys/time.h>
#include <unistd.h>

int select(int maxfdp1, fd_set* readfds, fd_set* writefds,
				fd_set* exceptfds, struct timeval *timeout);
返回:准备就绪的描述符的数量,若超时则为0,若出错则为-1
功能:委托内核查看这些描述符集里的描述符是否已经准备好可以使用。用户程序调用时会阻塞。

struct timeval{
	long tv_sec;
	long tv_usec;
};

参数:

  • maxfdp1: 最大fd加1值(max fd plus 1)。在三个描述符集中找出最大的描述符编号值,然后加1。表示描述符的数量范围。
  • readfds, writefds和exceptfds:是指向描述符集的指针。这三个描述符集说明了我们关心的可读,可写或处于异常条件的各个描述符。每个描述符集存放在一个fd_set数据类型中。
  • timeout:指定愿意等待的时间
    • timeout的三种值
      • NULL:永远等待,直到捕捉到信号 或 文件描述符准备好为止。
      • 具体值:struct timeval类型的指针,若等待为timeout时间,还没有文件描述符准备好,就立即返回。
      • 0:从不等待,测试所有的描述符并立即返回。

传向select的参数告诉内核
- 我们所关心的描述符
- 对于每个描述符我们所关心的条件(是否可读一个给定的描述符,是否可写一个给定的描述符,是否关心一个描述符的异常条件)
- 希望等待多长时间(可以永远等待,等待一个固定的时间,或者完全不等待)
从select返回时内核告诉我们
- 已准备好的描述符数量
- 哪一个描述符已准备好读、写或异常条件
- 使用这种返回值,就可调用相应的I/O函数(一般是read或write),并且确认该函数不会阻塞。
select函数根据希望进行的文件操作对文件描述符进行分类处理,这里对文件描述符的处理主要涉及4个宏函数
- FD_ZERO(fd_set* set)——清除一个文件描述符集
- FD_SET(int fd, fd_set* set)——将一个文件描述符加入文件描述符集中
- FD_CLR(int fd, fd_set* set)——将一个文件描述符从文件描述符集中清除
- FD_ISSET(int fd, fd_set* set)——测试该集中的一个给定位是否有变化
在使用select函数之前,首先使用FD_ZERO和FD_SET来初始化文件描述符集,在使用select函数时,可循环使用FD_ISSET测试描述符集,在执行完成对相关文件描述符后,使用FD_CLR来清除描述符集。

实现:
在这里插入图片描述

服务器端:

/*
 * IO_tcp_server.c
 * 
 */

/*系统相关头文件*/
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/types.h>
#include <time.h>
#include <string.h>
#include <signal.h>
#include <stdlib.h>
#include <errno.h>
/*网络相关头文件*/
#include <sys/socket.h>
#include <sys/select.h>
#include <arpa/inet.h>
#include <memory.h>

/*宏定义*/
#define BUFFLEN 24
#define SERVER_PORT 8888
#define LISTEN_QUE 5
#define CLIENT_NUM 1024

/*函数声明*/
void handle_sig(int signum);
void* handle_request(void *arg);
void* handle_connect(void *arg);

/*全局变量*/
int connect_host[CLIENT_NUM];
int connect_num = 0;
pthread_t th[2];
int serverfd;


/*主函数*/
int main(int argc, char *argv[])
{
	signal(SIGINT,handle_sig);
	signal(SIGPIPE,handle_sig);
	
	struct sockaddr_in local_addr; //本地地址
	memset(connect_host,-1,sizeof(connect_host));
	
	/*建立套接字*/
	serverfd = socket(AF_INET, SOCK_STREAM, 0);
	
	/*初始化地址*/
	memset(&local_addr, 0, sizeof(local_addr));
	local_addr.sin_family = AF_INET;
	local_addr.sin_port = htons(SERVER_PORT);
	local_addr.sin_addr.s_addr = htonl(INADDR_ANY);
	
	/*将socket描述符绑定本地地址和端口*/
	bind(serverfd, (struct sockaddr*)&local_addr, sizeof(local_addr));
	
	/*侦听socket*/
	listen(serverfd, LISTEN_QUE);
	
	/*创建线程处理客户端连接*/
	pthread_create(&th[0], 0, handle_connect,0);
	pthread_detach(th[0]);
	/*创建线程处理客户端请求*/
	pthread_create(&th[1], 0, handle_request,0);
	pthread_detach(th[1]);
	
	while(1);
	
	return 0;
}

/*函数:信号处理*/
void handle_sig(int signum)
{
	pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS,0);
	switch(signum){
		case SIGINT:
			pthread_cancel(th[0]);
			pthread_cancel(th[1]);
			close(serverfd);
			printf("server closed\\n");
			exit(0);
			break;
		case SIGPIPE: 
			printf("client close\\n");
			break;
		default:
		break;
	}
}

/*函数:客户端连接*/
void* handle_connect(void *arg)
{
	struct sockaddr_in client_addr;
	socklen_t len = sizeof(client_addr);
	
	/*接收客户端连接*/
	while(1){
		int i = 1;
		int client_fd = accept(serverfd, (struct sockaddr*)&client_addr, &len);
		/*接收客户端请求*/
		printf("A client connect, from: %s\\n",inet_ntoa(client_addr.sin_addr));
		/*查找在线文件描述符数组合适的位置,放入客户端的文件描述符*/
		for(i=0; i<CLIENT_NUM; ++i){
			if(connect_host[i] == -1){
				connect_host[i] = client_fd;
				connect_num ++;
				/*继续轮询等待客户连接*/
				break;
			}
		}
	}
	return NULL;
}

/*函数:客户端请求*/
void* handle_request(void *arg)
{
	time_t now;
	char buff[BUFFLEN]; //收发数据缓冲区
	int size=0;
	int maxfd = -1; //最大侦听文件描述符
	fd_set scanfd; //侦听描述符集合
	struct timeval timeout; //超时
	timeout.tv_sec = 1

以上是关于Linux学习_多路I/O复用的主要内容,如果未能解决你的问题,请参考以下文章

第15章 高并发服务器编程_I/O多路复用

I/O多路复用

Java基础:I/O多路复用模型及Linux中的应用

Linux IO多路复用

I/O多路复用-EPOLL探索

[Linux 高并发服务器] I/O 多路复用