Linux学习_多路I/O复用
Posted Leslie X徐
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Linux学习_多路I/O复用相关的知识,希望对你有一定的参考价值。
网络高级编程 多路I/O复用
概念
- 如recv,send,read和write等函数都是阻塞性函数,若资源没有准备好则调用该函数的进程将进入阻塞状态。
- 我们可以使用I/O多路复用解决,有两种方式:
- fcntl函数实现(非阻塞方式)
- select函数实现
- 守护进程
fcntl非阻塞实现
动态数组函数:
/*
* vector_fd.c
*
*/
#include "vector_fd.h"
#include <malloc.h>
#include <assert.h>
#include <stdlib.h>
#include <memory.h>
static void encapacity(VectorFD* vfd)
{
if(vfd->counter >= vfd->max_counter){
int *fds = (int*)calloc(vfd->counter+5,sizeof(int));
assert(fds != NULL);
memcpy(fds, vfd->fd, sizeof(int)*vfd->counter);
free(vfd->fd);
vfd->fd = fds;
vfd->max_counter += 5;
}
}
static int indexof(VectorFD *vfd, int fd)
{
int i=0;
for(;i < vfd->counter; i++){
if(vfd->fd[i]==fd) return i;
}
return -1;
}
/*创建动态数组*/
VectorFD* create_vector_fd(void)
{
VectorFD *vfd = (VectorFD*)calloc(1,sizeof(VectorFD));
assert(vfd != NULL);
vfd->fd = (int*)calloc(5,sizeof(int));
assert(vfd->fd != NULL);
vfd->counter = 0;
vfd->max_counter = 0;
return vfd;
}
/*销毁动态数组*/
void destroy_vector_fd(VectorFD *vfd)
{
assert(vfd != NULL);
free(vfd->fd);
free(vfd);
}
/*从动态数组中根据下标得到套接字描述符*/
int get_fd(VectorFD* vfd, int index)
{
assert(vfd != NULL);
if(index<0 || index > vfd->counter-1)return 0;
return vfd->fd[index];
}
/*删除某一个套接字描述符*/
void remove_fd(VectorFD* vfd, int fd)
{
assert(vfd != NULL);
int index = indexof(vfd, fd);
if(index == -1) return;
int i = index;
for(;i < vfd->counter-1; i++){
vfd->fd[i] = vfd->fd[i+1];
}
vfd->counter--;
}
/*增加一个套接字描述符*/
void add_fd(VectorFD* vfd, int fd)
{
assert(vfd != NULL);
encapacity(vfd);
vfd->fd[vfd->counter++] = fd;
}
程序实现:
/*
* TCP服务器.c
*
*
*/
#include <netdb.h>
#include <sys/socket.h>
#include <unistd.h>
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <memory.h>
#include <signal.h>
#include <time.h>
#include <arpa/inet.h>
#include "vector_fd.h"
#include <errno.h>
#include <pthread.h>
#include <fcntl.h>
VectorFD* vfd;
int sockfd;
//登记信号处理函数,使程序接收ctrl+C信号时关闭服务器
void sig_handler(int signo)
{
if(signo==SIGINT)
{
printf("server close\\n");
/*步骤6:关闭客户端的socket*/
close(sockfd);
//销毁动态数组
destroy_vector_fd(vfd);
exit(1);
}
}
/*输出连接上来的客户端相关信息*/
void out_addr(struct sockaddr_in *clientaddr)
{
//将端口号从网络字节序转换成主机字节序
int port = ntohs(clientaddr->sin_port);
char ip[16];
memset(ip,0,sizeof(ip));
//将ip地址从网络字节序转换成点分十进制放到ip字符数组中
inet_ntop(AF_INET,
&clientaddr->sin_addr.s_addr,
ip, sizeof(ip));
printf("client: %s(%d) connected\\n",ip,port);
}
/* fd对应于某个连接的客户端
* 功能:和某一个连接的客户端进行双向通信
* (非阻塞方式)
*/
void do_service(int fd)
{
/*和客户端进行读写操作(双向通信)*/
char buff[512];
memset(buff,0,sizeof(buff));
/*因为采用非阻塞方式,若读不到数据直接返回
* 直接服务于下一个客户端
* 因此不需要判断size小于0的情况。
*/
size_t size = read(fd,buff,sizeof(buff));
if(size == 0){//客户端已关闭连接
char info[]="client closed";
write(STDOUT_FILENO, info, sizeof(info));//使用不带缓存的内核函数
//从动态数组中删除对应的fd
remove_fd(vfd, fd);
//关闭对应客户端的socket
close(fd);
}else if(size>0){
write(STDOUT_FILENO, buff, sizeof(buff));
if(write(fd, buff, size) < 0){ //客户端关闭,读取出错,返回值为-1
if(errno==EPIPE){//客户端关闭连接
perror("write error");
remove_fd(vfd, fd);
close(fd);
}
}
}
}
//线程遍历动态数组中的socket描述符
void* th_fn(void* arg)
{
int i = 0;
while(1){
i=0;
//遍历动态数组中的socket描述符
for(; i < vfd->counter; i++){
do_service(get_fd(vfd, i));
}
}
return NULL;
}
/*主函数*/
int main(int argc, char **argv)
{
if(argc<2)
{
printf("usage:%s #port\\n",argv[0]); //传递端口参数
exit(1);
}
/*处理ctl+c信号,关闭服务器*/
if(signal(SIGINT, sig_handler)==SIG_ERR)
{
perror("signal sigint error");
exit(1);
}
/*步骤1:创建socket
* 注:socket创建在内核中,是一个结构体。
* AF_INET:IPv4
* SOCK_STREAM:tcp协议
* */
sockfd = socket(AF_INET, SOCK_STREAM, 0);
/*步骤2:将socket和地址(包括ip、port)进行绑定*/
struct sockaddr_in serveraddr;
memset(&serveraddr,0,sizeof(serveraddr));
//往地址中填入ip、port、internet地址族类型
serveraddr.sin_family = AF_INET; //IPv4
serveraddr.sin_port = htons(atoi(argv[1])); //port
serveraddr.sin_addr.s_addr = INADDR_ANY; //INADDR_ANY响应所有客户端请求
if(bind(sockfd, (struct sockaddr*)&serveraddr,
sizeof(serveraddr))<0)
{perror("bind error");exit(1);}
/*步骤3:调用listen启动监听(指定端口监听)
* 通知系统去接受来自客户端的连接请求
* (将接收到的客户端连接请求放置到对应的队列中)
* 第二个参数:指定队列长度为10
* */
if(listen(sockfd, 10)<0)
{perror("listen error");exit(1);}
/*步骤4:调用accept函数,
* 从队列中获得一个客户端的请求连接,
* 并返回新的socket描述符。
* 若没有客户端连接,调用此函数会阻塞,直到获得一个客户端连接。
* */
//创建放置套接字描述符fd的动态数组
vfd = create_vector_fd();
pthread_t th;
int err;
err = pthread_create(&th,0,th_fn,(void*)0);
if(err){perror("create error");exit(1);}
//以分离状态启动子线程
pthread_detach(th);
/* 1)主控线程获得客户端的连接,将新的socket描述符
* 放置到动态数组中
* 2)启动的子线程负责遍历动态数组中的socket描述符
* 并和对应的客户端进行双向通信
* (采用非阻塞方式读写)
*/
struct sockaddr_in clientaddr;
socklen_t len = sizeof(clientaddr);
while(1){
int fd = accept(sockfd,
(struct sockaddr*)&clientaddr,
&len);
if(fd<0){
perror("accept error");
continue;
}
out_addr(&clientaddr);
//将读写修改为非阻塞方式
int val;
fcntl(fd, F_GETFL, &val);
val |= O_NONBLOCK;
fcntl(fd, F_SETFL, val);
//将返回的新的socket描述符fd加入到动态数组中
add_fd(vfd, fd);
}
return 0;
}
select函数实现
select函数定义:
#include <sys/types.h>
#include <sys/time.h>
#include <unistd.h>
int select(int maxfdp1, fd_set* readfds, fd_set* writefds,
fd_set* exceptfds, struct timeval *timeout);
返回:准备就绪的描述符的数量,若超时则为0,若出错则为-1
功能:委托内核查看这些描述符集里的描述符是否已经准备好可以使用。用户程序调用时会阻塞。
struct timeval{
long tv_sec;
long tv_usec;
};
参数:
- maxfdp1: 最大fd加1值(max fd plus 1)。在三个描述符集中找出最大的描述符编号值,然后加1。表示描述符的数量范围。
- readfds, writefds和exceptfds:是指向描述符集的指针。这三个描述符集说明了我们关心的可读,可写或处于异常条件的各个描述符。每个描述符集存放在一个fd_set数据类型中。
- timeout:指定愿意等待的时间
- timeout的三种值
- NULL:永远等待,直到捕捉到信号 或 文件描述符准备好为止。
- 具体值:struct timeval类型的指针,若等待为timeout时间,还没有文件描述符准备好,就立即返回。
- 0:从不等待,测试所有的描述符并立即返回。
- timeout的三种值
传向select的参数告诉内核
- 我们所关心的描述符
- 对于每个描述符我们所关心的条件(是否可读一个给定的描述符,是否可写一个给定的描述符,是否关心一个描述符的异常条件)
- 希望等待多长时间(可以永远等待,等待一个固定的时间,或者完全不等待)
从select返回时内核告诉我们
- 已准备好的描述符数量
- 哪一个描述符已准备好读、写或异常条件
- 使用这种返回值,就可调用相应的I/O函数(一般是read或write),并且确认该函数不会阻塞。
select函数根据希望进行的文件操作对文件描述符进行分类处理,这里对文件描述符的处理主要涉及4个宏函数
- FD_ZERO(fd_set* set)——清除一个文件描述符集
- FD_SET(int fd, fd_set* set)——将一个文件描述符加入文件描述符集中
- FD_CLR(int fd, fd_set* set)——将一个文件描述符从文件描述符集中清除
- FD_ISSET(int fd, fd_set* set)——测试该集中的一个给定位是否有变化
在使用select函数之前,首先使用FD_ZERO和FD_SET来初始化文件描述符集,在使用select函数时,可循环使用FD_ISSET测试描述符集,在执行完成对相关文件描述符后,使用FD_CLR来清除描述符集。
实现:
服务器端:
/*
* IO_tcp_server.c
*
*/
/*系统相关头文件*/
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/types.h>
#include <time.h>
#include <string.h>
#include <signal.h>
#include <stdlib.h>
#include <errno.h>
/*网络相关头文件*/
#include <sys/socket.h>
#include <sys/select.h>
#include <arpa/inet.h>
#include <memory.h>
/*宏定义*/
#define BUFFLEN 24
#define SERVER_PORT 8888
#define LISTEN_QUE 5
#define CLIENT_NUM 1024
/*函数声明*/
void handle_sig(int signum);
void* handle_request(void *arg);
void* handle_connect(void *arg);
/*全局变量*/
int connect_host[CLIENT_NUM];
int connect_num = 0;
pthread_t th[2];
int serverfd;
/*主函数*/
int main(int argc, char *argv[])
{
signal(SIGINT,handle_sig);
signal(SIGPIPE,handle_sig);
struct sockaddr_in local_addr; //本地地址
memset(connect_host,-1,sizeof(connect_host));
/*建立套接字*/
serverfd = socket(AF_INET, SOCK_STREAM, 0);
/*初始化地址*/
memset(&local_addr, 0, sizeof(local_addr));
local_addr.sin_family = AF_INET;
local_addr.sin_port = htons(SERVER_PORT);
local_addr.sin_addr.s_addr = htonl(INADDR_ANY);
/*将socket描述符绑定本地地址和端口*/
bind(serverfd, (struct sockaddr*)&local_addr, sizeof(local_addr));
/*侦听socket*/
listen(serverfd, LISTEN_QUE);
/*创建线程处理客户端连接*/
pthread_create(&th[0], 0, handle_connect,0);
pthread_detach(th[0]);
/*创建线程处理客户端请求*/
pthread_create(&th[1], 0, handle_request,0);
pthread_detach(th[1]);
while(1);
return 0;
}
/*函数:信号处理*/
void handle_sig(int signum)
{
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS,0);
switch(signum){
case SIGINT:
pthread_cancel(th[0]);
pthread_cancel(th[1]);
close(serverfd);
printf("server closed\\n");
exit(0);
break;
case SIGPIPE:
printf("client close\\n");
break;
default:
break;
}
}
/*函数:客户端连接*/
void* handle_connect(void *arg)
{
struct sockaddr_in client_addr;
socklen_t len = sizeof(client_addr);
/*接收客户端连接*/
while(1){
int i = 1;
int client_fd = accept(serverfd, (struct sockaddr*)&client_addr, &len);
/*接收客户端请求*/
printf("A client connect, from: %s\\n",inet_ntoa(client_addr.sin_addr));
/*查找在线文件描述符数组合适的位置,放入客户端的文件描述符*/
for(i=0; i<CLIENT_NUM; ++i){
if(connect_host[i] == -1){
connect_host[i] = client_fd;
connect_num ++;
/*继续轮询等待客户连接*/
break;
}
}
}
return NULL;
}
/*函数:客户端请求*/
void* handle_request(void *arg)
{
time_t now;
char buff[BUFFLEN]; //收发数据缓冲区
int size=0;
int maxfd = -1; //最大侦听文件描述符
fd_set scanfd; //侦听描述符集合
struct timeval timeout; //超时
timeout.tv_sec = 1以上是关于Linux学习_多路I/O复用的主要内容,如果未能解决你的问题,请参考以下文章