Linux高性能服务器编程I/O复用的高级应用

Posted 李 ~

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Linux高性能服务器编程I/O复用的高级应用相关的知识,希望对你有一定的参考价值。

文章目录


一、基于 select 的非阻塞 connect

connect系统调用的 man 手册中有如下一段内容:

EINPROGERESS
The socket is nonblocking and the connection cannot be completed immediately. It is possible to select(2) or poll(2) for completion by selecting the socket for writing. After select(2) indicates writability, use getsockopt(2) to read the SO_ERROR option at level SOL_SOCKET to determine whether connect() completed successfully (SO_ERROR is zero) or unsuccessfully (SO_ERROR is one of the usual error codes listed here, explaining the reason for the failure).

这段话描述了 connect 出错时的一种 errno 值:EINPROGRESS。这种错误发生在对非阻塞了socket调用了connect,而连接又没有建立时。根据man文档的解释,在这种情况下我们可以调用selectpoll等函数来监听这个连接失败的socket上的可写事件。当selectpoll等函数返回后,再利用getsockopt来读取错误码并清除该socket上的错误。如果错误码是0,表示连接成功,否则连接失败。

通过上面描述的非阻塞 connect 方式,我们就能同时发起多个连接并一起等待,下面是非阻塞connect的一种实现方式。

首先是设置文件描述符为非阻塞状态:

// 设置文件描述符为非阻塞
int setnonblocking(int fd)

    int old_opt = fcntl(fd, F_GETFL);
    int new_opt = old_opt | O_NONBLOCK;
    fcntl(fd, F_SETFL, new_opt);
    // 返回以前的文件描述符状态
    return old_opt;

先使用fcntl函数获取并保存sockfd描述符的状态,然后再使用fcntl函数将其设置为非阻塞的,然后将原状态返回,便于在建立连接成功后恢复sockfd的状态。

实现unblock_connect函数:

// 非阻塞连接函数,参数是服务器IP、端口以及超时时间。成功返回处于连接状态的socket,失败则返回-1.
int unblock_connect(const std::string &ip, uint16_t port, int time)

    int ret = 0;
    struct sockaddr_in addr;
    bzero(&addr, sizeof(addr));
    addr.sin_family = AF_INET;
    inet_pton(AF_INET, ip.c_str(), &addr.sin_addr);
    addr.sin_port = htons(port);

    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    // 设置套接字描述符为非阻塞,并保存其原来的状态
    int fdopt = setnonblocking(sockfd);
    ret = connect(sockfd, (const sockaddr *)&addr, sizeof(addr));
    if (ret == 0)
    
        // 连接成功,将sockfd恢复原来的状态,然后返回
        std::cout << "connect with server immediately!" << std::endl;
        fcntl(sockfd, F_SETFL, fdopt);
        return sockfd;
    
    else if (errno != EINPROGRESS)
    
        // 如果连接没有立即建立,那么只有errno为EINPROGRESS时才表示连接还在继续,否则出错返回
        std::cout << "unblock connect not support!" << std::endl;
        close(sockfd);
        return -1;
    
    else
    
        // 继续连接

        // 写文件描述符集
        fd_set writefds;
        FD_ZERO(&writefds);
        // 将sockfd添加到writefds中
        FD_SET(sockfd, &writefds);
        timeval timeout;
        timeout.tv_sec = time;
        timeout.tv_usec = 0;

        ret = select(sockfd + 1, nullptr, &writefds, nullptr, &timeout);
        if (ret <= 0)
        
            // select超时或出错,立即返回
            std::cout << "connect timeout!" << std::endl;
            close(sockfd);
            return -1;
        

        if (!FD_ISSET(sockfd, &writefds))
        
            std::cout << "no events on sockfd found!" << std::endl;
            close(sockfd);
            return -1;
        

        int error = 0;
        socklen_t len = sizeof(error);
        // 调用 getsockopt来获取并清除sockfd上的错误
        if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &error, &len) < 0)
        
            std::cout << "get socket option failed!" << std::endl;
            close(sockfd);
            return -1;
        

        // 错误码不为0表示连接出错
        if (error != 0)
        
            std::cout << "conntion failed after select with the error: " << error << std::endl;
            close(sockfd);
            return -1;
        

        // 连接成功,将sockfd恢复原来的状态,然后返回
        std::cout << "connect successfully after select with the sockfd: " << sockfd << std::endl;
        fcntl(sockfd, F_SETFL, fdopt);
        return sockfd;
    

在该函数中,首先创建socket套接字,并将其设置为非阻塞的,然后调用connect函数,如果连接成功,恢复sockfd的原状态后返回;如果不能立即连接成功,只有返回的errnoEINPROGRESS才表示连接还在进行,否则直接出错返回。

当返回的errnoEINPROGRESS时,调用select多路复用系统调用对sockfd的写事件进行监听。监听成功后,调用getsockopt函数来获取并清除sockfd上的错误。如果错误码是0,则表示在调用select函数后建立连接成功,恢复sockfd的原状态后返回;否则出错返回。

main函数逻辑:

int main(int argc, char* argv[])

    if(argc != 3)
    
        std::cout << "usage: " << argv[0] << " ip port" << std::endl;
        return -1;
    

    const std::string ip = argv[1];
    uint16_t port = atoi(argv[2]);

    int sockfd = unblock_connect(ip, port, 10);
    if(sockfd < 0)
    
        std::cout << "unblock connect failed!" << std::endl;
        return -1;
    
    else
    
        std::cout << "连接成功,可以向服务器发起请求了..." << std::endl;
    

    close(sockfd);
    return 0;

二、基于 poll 的聊天室程序

这里基于poll系统调用实现一个简单的聊天室程序,以阐述如何使用I/O复用技术来同时处理网络连接给用户输入。该聊天室程序能让所有的用户同时在线群聊,它分为客户端和服务端两个部分。

其中客户端有两个功能:一是从标准输入终端读入用户数据,并将用户数据发送至服务器;二是往标准输出终端打印服务器发送给它的数据。服务器的功能是接收客户端数据,并将接收到的数据发送给每一个登录到该服务器上的客户端(数据发送者除外)。

效果展示如下:

以下是客户端和服务端的代码。

2.1 客户端

客户端程序使用 poll 同时监听用户输入和网络连接,并利用 splice 函数将用户输入的内容直接定向到网络连接上以发送之,从而实现数据的零拷贝,提高了程序的运行效率。客户端代码如下:

#include <iostream>
#include <string>
#include <cstring>
#include <cassert>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
#include <poll.h>
#include <fcntl.h>

#define BUFFER_SIZE 1024

using namespace std;

int main(int argc, char *argv[])

    if (argc != 3)
    
        cout << "usage: " << argv[0] << " ip port" << endl;
        return -1;
    

    const string ip = argv[1];
    uint16_t port = atoi(argv[2]);

    sockaddr_in server_addr;
    bzero(&server_addr, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    inet_pton(AF_INET, ip.c_str(), &server_addr.sin_addr);
    server_addr.sin_port = htons(port);

    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    assert(sockfd >= 0);

    if (connect(sockfd, (const sockaddr *)&server_addr, sizeof(server_addr)) < 0)
    
        cerr << "connect failed!" << endl;
        close(sockfd);
        return -1;
    

    pollfd fds[2];

    // 注册文件描述符0(标准输入) 和 sockfd 文件描述符的可读事件
    fds[0].fd = 0;
    fds[0].events = POLLIN;
    fds[0].revents = 0;

    fds[1].fd = sockfd;
    fds[1].events = POLLIN | POLLRDHUP;
    fds[1].revents = 0;

    char read_buf[BUFFER_SIZE];
    int pipefd[2];
    int ret = 0;
    ret = pipe(pipefd);
    assert(ret != -1);


    while (true)
    
        ret = poll(fds, 2, -1);
        if(ret < 0)
        
            cout << "poll falied!" << endl;
            break;
        

        if(fds[1].revents & POLLRDHUP)
        
            cout << "server close the connection!" << endl;
        
        else if(fds[1].revents & POLLIN)
        
            memset(read_buf, '\\0', BUFFER_SIZE);
            ssize_t s = recv(fds[1].fd, read_buf, BUFFER_SIZE - 1, 0);
            read_buf[s] = '\\0';
            cout << read_buf << endl;
        

        if(fds[0].revents & POLLIN)
        
            //使用splice函数将用户输入的数据直接写到 sockfd 上(零拷贝)
            ret = splice(0, nullptr, pipefd[1], nullptr, 32768, SPLICE_F_MORE | SPLICE_F_MOVE);
            ret = splice(pipefd[0], nullptr, sockfd, nullptr, 32768, SPLICE_F_MORE | SPLICE_F_MOVE);
        
    

    close(sockfd);
    return 0;

2.2 服务器

服务器程序使用poll同时管理监听 socket 和连接 socket,并且使用牺牲空间换取时间的策略来提高服务器的性能。服务器代码如下:

#include <iostream>
#include <string>
#include <cstring>
#include <cassert>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <poll.h>

#define USER_LIMIT 5     // 最大用户数量
#define BUFFER_SIZE 1024 // 读缓冲区的大小
#define FD_LIMIT 65535   // 文件描述符数量限制

// 客户端数据
struct client_data

    sockaddr_in address;        // 客户端socket
    char *write_buf;            // 待写到客户端的数据的位置
    char read_buf[BUFFER_SIZE]; // 从客户端读入的数据
;

using namespace std;

int setnonblocking(int fd)

    int old_opt = fcntl(fd, F_GETFL);
    int new_opt = old_opt | O_NONBLOCK;
    fcntl(fd, F_SETFL, new_opt);
    return old_opt;


int main(int argc, char *argv[])

    if (argc != 3)
    
        cout << "usage: " << argv[0] << " ip port" << endl;
        return -1;
    

    string ip = argv[1];
    uint16_t port = atoi(argv[2]);

    int ret = 0;
    sockaddr_in addr;
    bzero(&addr, sizeof(addr));
    addr.sin_family = AF_INET;
    addr.sin_port = htons(port);
    inet_pton(AF_INET, ip.c_str(), &addr.sin_addr);

    int listenfd = socket(AF_INET, SOCK_STREAM, 0);
    int opt = 1;
    setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));
    assert(listenfd >= 0);

    ret = bind(listenfd, (const sockaddr *)&addr, sizeof(addr));
    assert(ret != -1);

    ret = listen(listenfd, 5);
    assert(ret != -1);

    // 创建users数组,分配FD_LIMIT个client_data对象。
    // 每个可能的socket连接都可以获得这样一个对象,并且socket的值可以直接原来索引socket对应的client_data对象,
    // 这是将socket和客户数据关联的简单而高效的方法。
    client_data *users = new client_data[FD_LIMIT];

    // 尽管分配了足够多的client_data对象,但为了提高poll的性能,仍然有必要限制用户数量
    pollfd fds[USER_LIMIT + 1];
    // 当前的用户数量
    int user_count = 0;
    // 初始化fds
    for (int i = 0; i <= USER_LIMIT; ++i)
    
        fds[i].fd = -1;
        fds[i].events = 0;
    

    // 添加listenfd到fds
    fds[0].fd = listenfd;
    fds[0].events = POLLIN | POLLERR;
    fds[0].revents = 0;

    while (true)
    
        ret = poll(fds, user_count + 1, -1);
        if (ret < 0)
        
            cerr << "poll failed!" << endl;
            break;
        

        for (int i = 0; i <= user_count; ++i)
        
            // 如果此时是监听套接字就绪,则处理新连接
            if ((fds[i].fd == listenfd) && (fds[i].revents & POLLIN))
            
                sockaddr_in client_addr;
                socklen_t len = sizeof(client_addr);
                int connfd = accept(listenfd, (sockaddr *)&client_addr, &len);
                if (connfd < 0)
                
                    cerr << "accept failed! error: " << errno << "msg: " << strerror(errno) << endl;
                    continue;
                

                // 如果请求太多,则关闭该连接
                if (user_count >= USER_LIMIT)
                
                    const string info = "too many users\\n";
                    cout << info;
                    send(connfd, info.c_str(), info.size(), 0);
                    close(connfd);
                

                // 对于新连接,同时修改users和fds数组,以保证users[connfd] 对应于 新连接文件描述符connfd的客户端数据
                ++user_count;
                users[connfd].address = client_addr;
                setnonblocking(connfd);
                fds[user_count].fd = connfd;
                fds[user_count].events = POLLIN | POLLERR | POLLRDHUP;
                fds[user_count].revents = 0;

                cout << "comes a new user, now have " << user_count << " users!" << endl;
            
            // 如果connfd出错
            else if (fds[i].revents & POLLERROpenResty搭建高性能服务端
 

Socket编程

Linux Socket编程领域为了处理大量连接请求场景,需要使用非阻塞I/O和复用,selectpollepoll是Linux API提供的I/O复用方式,自从Linux2.6中加入了epoll之后,高性能服务器领域得到广泛的应用,Nignx就是使用epoll来实现I/O复用支持高并发。

对于“高性能”服务端而言,我们所关注的并不是语言的性能,而是缓存和语言支持异步非阻塞。

缓存

针对缓存要明白通信速度的快慢顺序

  • 内存>SSD>机械磁盘
  • 本机>网络
  • 进程内 > 进程间

缓存系统的目标是希望在进程内的命中率是最高的,那么此时缓存系统整体的效率也是最高的。

异步非阻塞

希望访问数据库、访问网络,访问一些比较慢的IO设备时,不要在等待上耗费大量时间。而是使用事件驱动的方式,当系统完成某项任务后再来通知我们。这样就可以将服务器CPU的空闲资源,用来服务客户端连接。

OpenResty

OpenResty是基于Ngnix和Lua的高性能web平台,内部集成精良的LUa库、第三方模块、依赖项。用于方便搭建能够处理高并发、扩展性极高的动态web应用、web服务、动态网关。可以使用Lua脚本调用Ngnix支持的C以及Lua模块,快速构建10K~1000K单机并发连接的高性能web应用系统。OpenResty的目标是让web服务直接运行在Nginx服务内部,利用Ngnix的非阻塞IO模型,对HTTP客户端请求和后端DB进行一致的高性能响应。

OpenResty的出现可以说是颠覆了高性能服务端的开发模式。OpenResty实际上是Nginx+LuaJIT的完美组合。

 技术图片
 
 
OpenResty工作方式

由于Nginx采用的是master-worker模型,也就是一个master主进程管理多个worker进程,基本的事件处理都是放在worker中,master仅负责一些全剧初始化,以及对worker的管理。在OpenResty中,每个worker使用一个LuaVM,每个请求被分配到worker时,将在这个LuaVM中创建一个coroutine协程。协程之间数据隔离,每个协程具有独立的全局变量_G

Lua中的协程和多线程下的线程类似,都有自己的堆栈、局部变量、指令指针...,但是和其他协程程序共享全局变量等信息。线程和协程主要不同在于:多处理器的情况下,概念上来说多线程是同时运行多个线程,而协程是通过代码来完成协程的切换,任何时刻只有一个协程程序在运行。并且这个在运行的协程只有明确被要求挂起时才会被挂起。

根据实际测试,OpenResty性能接近于Nginx 性能之王c module,甚至超过。

OpenResty 架构

  • 负载均衡

LVS+HAProxy将流量转发给核心Nginx1和Nginx2,即实现了流量的负载均衡。

 技术图片
 
 
负载均衡
  • 单机闭环

所有想要的数据都能从本服务器直接获取,大多数时候无需通过网络或去其他服务器获取。

 技术图片
 
单机闭环
  • 分布式闭环

单机闭环会遇到2个主要问题

  1. 数据不一致
    例如没有主从架构导致不同服务器数据不一致

  2. 遇到存储瓶颈
    磁盘或内存遇到天花板

解决数据不一致比较好的办法是采用主从或分布式集中存储,而遇到存储瓶颈就需要进行按业务键进行分片,将数据分散到多台服务器。

 技术图片
 
分布式闭环
  • 接入网关

接入网关又叫接入层,即接收流量的入口,在入口处做如下事情:

 技术图片
 
接入网关

OpenResty环境搭建

安装前准备,必须安装perllibpcrelibssl库。

# 从系统路径中查看必备库是否已经安装
$ sudo ldconfig -v

# 安装必备库
$ sudo apt install libpcre3-dev libssl-dev perl make build-essential curl libreadline-dev libncurses5-dev

下载并解压OpenResty后进入其目录

$ wget https://openresty.org/download/ngx_openresty-1.13.6.1.tar.gz
$ tar -zxvf ngx_openresty-1.13.6.1.tar.gz
$ mv openresty-1.13.6.1 openresty
$ cd openresty
$ ./configure

默认会被安装到/usr/local/openresty目录下

# 编译并安装
$ sudo make && make install
$ cd /usr/local/openresty

启动Nginx

$ sudo /usr/local/openresty/nginx/sbin/nginx
$ ps -ef | grep nginx
$ service nginx status

Nginx启动若出现

nginx: [emerg] bind() to 0.0.0.0:80 failed (98: Address already in use)
nginx: [emerg] still could not bind()

说明80端口并占用,查看80端口被占用的端口并重启。原因在于nginx先监听了ipv4的80端口之后又监听了ipv6的80端口,于是就重复占用了。

$ sudo netstat -ntlp | grep 80
$ sudo killall -9 nginx

重新编辑Nginx配置文件

$ sudo vim /etc/nginx/conf/nginx.conf

listen 80;
listen [::]:80 ipv6only=on default_server;

使用curl工具或在浏览器访问默认80端口

$ curl 127.0.0.1

浏览器输入http://127.0.0.1/

将Nginx工具配置到当前用户的系统环境变量中

$ sudo vim ~/.bashrc
export PATH=$PATH:/usr/local/openresty/nginx/sbin
$ source ~./bashrc
$ cd ~
$ nginx -s reload
nginx: [alert] kill(12267, 1) failed (1: Operation not permitted)

开发文档

ubuntu 安装 vcode 或 sublime text 编辑器

content_by_lua

$ vim /usr/local/openresty/nginx/conf/nginx.conf
location /test {
  default_type text/html;
  content_by_lua ‘ngx.say("hello openresty")‘;
}

# 重启Nginx
$ /usr/local/openresty/nginx/sbin/nginx -s reload

# 浏览器访问 127.0.0.1/test

content_by_lua_file

$ vim nginx.conf
location /test {
  content_by_lua_file ‘html/test.lua‘;
}
$ vim ../html/test.lua
ngx.say("hello lua")
$ sudo /usr/local/nginx/sbin/nginx -s reload
$ curl 127.0.0.1/test
hello lua

为避免每次修改都需要重启Nginx,可在Nginx的server选项中配置lua_code_cache选项。

$ vim nginx.conf
server{
  lua_code_cache off;
  location /test{
    content_by_lua_file ‘html/test.lua‘;
  }
}
$ sudo /usr/local/openresty/nginx/sbin/nginx -s reload
nginx: [alert] lua_code_cache is off; this will hurt performance in /usr/local/openresty/nginx/conf/nginx.conf:48

注意lua_code_cache off;是会引擎Nginx的性能的,在生产环境中是需要将其开启的。

小节

在OpenResty中开发是分为两步的,第一步是修改Nginx配置,第二步是使用Lua开发自己的脚本。

OpenResty入门

参考资料

创建工作目录

OpenResty安装之后就有配置文件及相关目录,为了工作目录和安装目录互不干扰,另外创建OpenResty工作目录,并另写配置。

$ mkdir -p ~/openresty/test/logs ~/openresty/test/conf
$ vim ~/openresty/test/conf/nginx.conf
# 设置Nginx worker工作进程数量,即CPU核数。
worker_processes 1;

# 设置错误日志文件路径
error_log logs/error.log;
# 配置Nginx服务器与用户的网络连接
events{
    # 设置每个工作进程的最大连接数
    worker_connections 10224;
}

http{
    # 虚拟机主机块定义
    server{
        # 监听端口
        listen 8001;
        # 配置请求的路由
        location /{
            default_type text/html;
            content_by_lua_block{
                ngx.say("hello world");
            }
        }
    }
}
$ nginx -p ~/openresty/test
$ curl 127.0.0.1:8001
hello world
$ vim nginx.conf
location /test{
  content_by_lua_file "lua/test.lua";
}
$ cd .. && mkdir lua && cd lua
$ vim test.lua
local args = ngx.req.get_uri_args()
local salt = args.salt
if not salt then
  ngx.exit(ngx.HTTP_BAD_REQUEST)
end
local md5str = ngx.md5(ngx.time()..salt)
ngx.say(md5str)

$ sudo /usr/local/openresty/nginx/sbin/nginx -s reload
$ curl -i 127.0.0.1/test?salt=lua
HTTP/1.1 200 OK
Server: openresty/1.13.6.2
Date: Sun, 27 Jan 2019 10:07:17 GMT
Content-Type: application/octet-stream
Transfer-Encoding: chunked
Connection: keep-alive

b55b77f75e46b96b11778ca7edfe8d55

若代码中出现错误则需要直接查看Nginx的错误日志进行查看

$ vim nginx/logs/error.log
2019/01/27 17:37:15 [error] 15764#0: *6 failed to load external Lua file "/usr/local/openresty/nginx/test.lua": cannot open /usr/local/openresty/nginx/test.lua: No such file or...

Windows系统下查看Nginx进程

λ tasklist /fi "imagename eq nginx.exe"

映像名称                       PID 会话名              会话#       内存使用
========================= ======== ================ =========== ============
nginx.exe                     9072 Console                    1      7,840 K
nginx.exe                     7692 Console                    1     12,304 K
nginx.exe                     8120 Console                    1      7,840 K
nginx.exe                     4552 Console                    1     12,188 K
nginx.exe                     9588 Console                    1      7,828 K
nginx.exe                     6256 Console                    1     12,216 K
nginx.exe                     7308 Console                    1      7,828 K
nginx.exe                    10192 Console                    1     12,212 K

λ taskkill /im nginx.exe /f
成功: 已终止进程 "nginx.exe",其 PID 为 9072。

ngx lua API

参考资料

 

以上是关于Linux高性能服务器编程I/O复用的高级应用的主要内容,如果未能解决你的问题,请参考以下文章

Linux高性能服务编程(I/O复用)

《Linux高性能服务器编程》学习总结——I/O复用

《Linux高性能服务器编程》学习总结——高级I/O函数

Linux高性能server编程——I/O复用

OpenResty搭建高性能服务端

OpenResty搭建高性能服务端