Linux高性能服务器编程I/O复用的高级应用
Posted 李 ~
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Linux高性能服务器编程I/O复用的高级应用相关的知识,希望对你有一定的参考价值。
文章目录
一、基于 select 的非阻塞 connect
connect
系统调用的 man 手册中有如下一段内容:
EINPROGERESS
The socket is nonblocking and the connection cannot be completed immediately. It is possible to select(2) or poll(2) for completion by selecting the socket for writing. After select(2) indicates writability, use getsockopt(2) to read the SO_ERROR option at level SOL_SOCKET to determine whether connect() completed successfully (SO_ERROR is zero) or unsuccessfully (SO_ERROR is one of the usual error codes listed here, explaining the reason for the failure).
这段话描述了 connect
出错时的一种 errno
值:EINPROGRESS
。这种错误发生在对非阻塞了socket
调用了connect
,而连接又没有建立时。根据man文档的解释,在这种情况下我们可以调用select
、poll
等函数来监听这个连接失败的socket
上的可写事件。当select
、poll
等函数返回后,再利用getsockopt
来读取错误码并清除该socket
上的错误。如果错误码是0,表示连接成功,否则连接失败。
通过上面描述的非阻塞 connect
方式,我们就能同时发起多个连接并一起等待,下面是非阻塞connect
的一种实现方式。
首先是设置文件描述符为非阻塞状态:
// 设置文件描述符为非阻塞
int setnonblocking(int fd)
int old_opt = fcntl(fd, F_GETFL);
int new_opt = old_opt | O_NONBLOCK;
fcntl(fd, F_SETFL, new_opt);
// 返回以前的文件描述符状态
return old_opt;
先使用fcntl
函数获取并保存sockfd
描述符的状态,然后再使用fcntl
函数将其设置为非阻塞的,然后将原状态返回,便于在建立连接成功后恢复sockfd
的状态。
实现unblock_connect函数:
// 非阻塞连接函数,参数是服务器IP、端口以及超时时间。成功返回处于连接状态的socket,失败则返回-1.
int unblock_connect(const std::string &ip, uint16_t port, int time)
int ret = 0;
struct sockaddr_in addr;
bzero(&addr, sizeof(addr));
addr.sin_family = AF_INET;
inet_pton(AF_INET, ip.c_str(), &addr.sin_addr);
addr.sin_port = htons(port);
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
// 设置套接字描述符为非阻塞,并保存其原来的状态
int fdopt = setnonblocking(sockfd);
ret = connect(sockfd, (const sockaddr *)&addr, sizeof(addr));
if (ret == 0)
// 连接成功,将sockfd恢复原来的状态,然后返回
std::cout << "connect with server immediately!" << std::endl;
fcntl(sockfd, F_SETFL, fdopt);
return sockfd;
else if (errno != EINPROGRESS)
// 如果连接没有立即建立,那么只有errno为EINPROGRESS时才表示连接还在继续,否则出错返回
std::cout << "unblock connect not support!" << std::endl;
close(sockfd);
return -1;
else
// 继续连接
// 写文件描述符集
fd_set writefds;
FD_ZERO(&writefds);
// 将sockfd添加到writefds中
FD_SET(sockfd, &writefds);
timeval timeout;
timeout.tv_sec = time;
timeout.tv_usec = 0;
ret = select(sockfd + 1, nullptr, &writefds, nullptr, &timeout);
if (ret <= 0)
// select超时或出错,立即返回
std::cout << "connect timeout!" << std::endl;
close(sockfd);
return -1;
if (!FD_ISSET(sockfd, &writefds))
std::cout << "no events on sockfd found!" << std::endl;
close(sockfd);
return -1;
int error = 0;
socklen_t len = sizeof(error);
// 调用 getsockopt来获取并清除sockfd上的错误
if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &error, &len) < 0)
std::cout << "get socket option failed!" << std::endl;
close(sockfd);
return -1;
// 错误码不为0表示连接出错
if (error != 0)
std::cout << "conntion failed after select with the error: " << error << std::endl;
close(sockfd);
return -1;
// 连接成功,将sockfd恢复原来的状态,然后返回
std::cout << "connect successfully after select with the sockfd: " << sockfd << std::endl;
fcntl(sockfd, F_SETFL, fdopt);
return sockfd;
在该函数中,首先创建socket
套接字,并将其设置为非阻塞的,然后调用connect
函数,如果连接成功,恢复sockfd
的原状态后返回;如果不能立即连接成功,只有返回的errno
是EINPROGRESS
才表示连接还在进行,否则直接出错返回。
当返回的errno
是EINPROGRESS
时,调用select
多路复用系统调用对sockfd
的写事件进行监听。监听成功后,调用getsockopt
函数来获取并清除sockfd
上的错误。如果错误码是0,则表示在调用select
函数后建立连接成功,恢复sockfd
的原状态后返回;否则出错返回。
main
函数逻辑:
int main(int argc, char* argv[])
if(argc != 3)
std::cout << "usage: " << argv[0] << " ip port" << std::endl;
return -1;
const std::string ip = argv[1];
uint16_t port = atoi(argv[2]);
int sockfd = unblock_connect(ip, port, 10);
if(sockfd < 0)
std::cout << "unblock connect failed!" << std::endl;
return -1;
else
std::cout << "连接成功,可以向服务器发起请求了..." << std::endl;
close(sockfd);
return 0;
二、基于 poll 的聊天室程序
这里基于poll系统调用实现一个简单的聊天室程序,以阐述如何使用I/O复用技术来同时处理网络连接给用户输入。该聊天室程序能让所有的用户同时在线群聊,它分为客户端和服务端两个部分。
其中客户端有两个功能:一是从标准输入终端读入用户数据,并将用户数据发送至服务器;二是往标准输出终端打印服务器发送给它的数据。服务器的功能是接收客户端数据,并将接收到的数据发送给每一个登录到该服务器上的客户端(数据发送者除外)。
效果展示如下:
以下是客户端和服务端的代码。
2.1 客户端
客户端程序使用 poll 同时监听用户输入和网络连接,并利用 splice 函数将用户输入的内容直接定向到网络连接上以发送之,从而实现数据的零拷贝,提高了程序的运行效率。客户端代码如下:
#include <iostream>
#include <string>
#include <cstring>
#include <cassert>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
#include <poll.h>
#include <fcntl.h>
#define BUFFER_SIZE 1024
using namespace std;
int main(int argc, char *argv[])
if (argc != 3)
cout << "usage: " << argv[0] << " ip port" << endl;
return -1;
const string ip = argv[1];
uint16_t port = atoi(argv[2]);
sockaddr_in server_addr;
bzero(&server_addr, sizeof(server_addr));
server_addr.sin_family = AF_INET;
inet_pton(AF_INET, ip.c_str(), &server_addr.sin_addr);
server_addr.sin_port = htons(port);
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
assert(sockfd >= 0);
if (connect(sockfd, (const sockaddr *)&server_addr, sizeof(server_addr)) < 0)
cerr << "connect failed!" << endl;
close(sockfd);
return -1;
pollfd fds[2];
// 注册文件描述符0(标准输入) 和 sockfd 文件描述符的可读事件
fds[0].fd = 0;
fds[0].events = POLLIN;
fds[0].revents = 0;
fds[1].fd = sockfd;
fds[1].events = POLLIN | POLLRDHUP;
fds[1].revents = 0;
char read_buf[BUFFER_SIZE];
int pipefd[2];
int ret = 0;
ret = pipe(pipefd);
assert(ret != -1);
while (true)
ret = poll(fds, 2, -1);
if(ret < 0)
cout << "poll falied!" << endl;
break;
if(fds[1].revents & POLLRDHUP)
cout << "server close the connection!" << endl;
else if(fds[1].revents & POLLIN)
memset(read_buf, '\\0', BUFFER_SIZE);
ssize_t s = recv(fds[1].fd, read_buf, BUFFER_SIZE - 1, 0);
read_buf[s] = '\\0';
cout << read_buf << endl;
if(fds[0].revents & POLLIN)
//使用splice函数将用户输入的数据直接写到 sockfd 上(零拷贝)
ret = splice(0, nullptr, pipefd[1], nullptr, 32768, SPLICE_F_MORE | SPLICE_F_MOVE);
ret = splice(pipefd[0], nullptr, sockfd, nullptr, 32768, SPLICE_F_MORE | SPLICE_F_MOVE);
close(sockfd);
return 0;
2.2 服务器
服务器程序使用poll同时管理监听 socket 和连接 socket,并且使用牺牲空间换取时间的策略来提高服务器的性能。服务器代码如下:
#include <iostream>
#include <string>
#include <cstring>
#include <cassert>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <poll.h>
#define USER_LIMIT 5 // 最大用户数量
#define BUFFER_SIZE 1024 // 读缓冲区的大小
#define FD_LIMIT 65535 // 文件描述符数量限制
// 客户端数据
struct client_data
sockaddr_in address; // 客户端socket
char *write_buf; // 待写到客户端的数据的位置
char read_buf[BUFFER_SIZE]; // 从客户端读入的数据
;
using namespace std;
int setnonblocking(int fd)
int old_opt = fcntl(fd, F_GETFL);
int new_opt = old_opt | O_NONBLOCK;
fcntl(fd, F_SETFL, new_opt);
return old_opt;
int main(int argc, char *argv[])
if (argc != 3)
cout << "usage: " << argv[0] << " ip port" << endl;
return -1;
string ip = argv[1];
uint16_t port = atoi(argv[2]);
int ret = 0;
sockaddr_in addr;
bzero(&addr, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
inet_pton(AF_INET, ip.c_str(), &addr.sin_addr);
int listenfd = socket(AF_INET, SOCK_STREAM, 0);
int opt = 1;
setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));
assert(listenfd >= 0);
ret = bind(listenfd, (const sockaddr *)&addr, sizeof(addr));
assert(ret != -1);
ret = listen(listenfd, 5);
assert(ret != -1);
// 创建users数组,分配FD_LIMIT个client_data对象。
// 每个可能的socket连接都可以获得这样一个对象,并且socket的值可以直接原来索引socket对应的client_data对象,
// 这是将socket和客户数据关联的简单而高效的方法。
client_data *users = new client_data[FD_LIMIT];
// 尽管分配了足够多的client_data对象,但为了提高poll的性能,仍然有必要限制用户数量
pollfd fds[USER_LIMIT + 1];
// 当前的用户数量
int user_count = 0;
// 初始化fds
for (int i = 0; i <= USER_LIMIT; ++i)
fds[i].fd = -1;
fds[i].events = 0;
// 添加listenfd到fds
fds[0].fd = listenfd;
fds[0].events = POLLIN | POLLERR;
fds[0].revents = 0;
while (true)
ret = poll(fds, user_count + 1, -1);
if (ret < 0)
cerr << "poll failed!" << endl;
break;
for (int i = 0; i <= user_count; ++i)
// 如果此时是监听套接字就绪,则处理新连接
if ((fds[i].fd == listenfd) && (fds[i].revents & POLLIN))
sockaddr_in client_addr;
socklen_t len = sizeof(client_addr);
int connfd = accept(listenfd, (sockaddr *)&client_addr, &len);
if (connfd < 0)
cerr << "accept failed! error: " << errno << "msg: " << strerror(errno) << endl;
continue;
// 如果请求太多,则关闭该连接
if (user_count >= USER_LIMIT)
const string info = "too many users\\n";
cout << info;
send(connfd, info.c_str(), info.size(), 0);
close(connfd);
// 对于新连接,同时修改users和fds数组,以保证users[connfd] 对应于 新连接文件描述符connfd的客户端数据
++user_count;
users[connfd].address = client_addr;
setnonblocking(connfd);
fds[user_count].fd = connfd;
fds[user_count].events = POLLIN | POLLERR | POLLRDHUP;
fds[user_count].revents = 0;
cout << "comes a new user, now have " << user_count << " users!" << endl;
// 如果connfd出错
else if (fds[i].revents & POLLERROpenResty搭建高性能服务端
Socket编程
Linux Socket编程领域为了处理大量连接请求场景,需要使用非阻塞I/O和复用,select
、poll
、epoll
是Linux API提供的I/O复用方式,自从Linux2.6中加入了epoll之后,高性能服务器领域得到广泛的应用,Nignx就是使用epoll
来实现I/O复用支持高并发。
对于“高性能”服务端而言,我们所关注的并不是语言的性能,而是缓存和语言支持异步非阻塞。
缓存
针对缓存要明白通信速度的快慢顺序
- 内存>SSD>机械磁盘
- 本机>网络
- 进程内 > 进程间
缓存系统的目标是希望在进程内的命中率是最高的,那么此时缓存系统整体的效率也是最高的。
异步非阻塞
希望访问数据库、访问网络,访问一些比较慢的IO设备时,不要在等待上耗费大量时间。而是使用事件驱动的方式,当系统完成某项任务后再来通知我们。这样就可以将服务器CPU的空闲资源,用来服务客户端连接。
OpenResty
OpenResty是基于Ngnix和Lua的高性能web平台,内部集成精良的LUa库、第三方模块、依赖项。用于方便搭建能够处理高并发、扩展性极高的动态web应用、web服务、动态网关。可以使用Lua脚本调用Ngnix支持的C以及Lua模块,快速构建10K~1000K单机并发连接的高性能web应用系统。OpenResty的目标是让web服务直接运行在Nginx服务内部,利用Ngnix的非阻塞IO模型,对HTTP客户端请求和后端DB进行一致的高性能响应。
OpenResty的出现可以说是颠覆了高性能服务端的开发模式。OpenResty实际上是Nginx+LuaJIT的完美组合。
由于Nginx采用的是master-worker
模型,也就是一个master
主进程管理多个worker
进程,基本的事件处理都是放在worker
中,master
仅负责一些全剧初始化,以及对worker
的管理。在OpenResty中,每个worker
使用一个LuaVM,每个请求被分配到worker
时,将在这个LuaVM
中创建一个coroutine
协程。协程之间数据隔离,每个协程具有独立的全局变量_G
。
Lua中的协程和多线程下的线程类似,都有自己的堆栈、局部变量、指令指针...,但是和其他协程程序共享全局变量等信息。线程和协程主要不同在于:多处理器的情况下,概念上来说多线程是同时运行多个线程,而协程是通过代码来完成协程的切换,任何时刻只有一个协程程序在运行。并且这个在运行的协程只有明确被要求挂起时才会被挂起。
根据实际测试,OpenResty性能接近于Nginx 性能之王c module,甚至超过。
OpenResty 架构
- 负载均衡
LVS+HAProxy将流量转发给核心Nginx1和Nginx2,即实现了流量的负载均衡。
- 单机闭环
所有想要的数据都能从本服务器直接获取,大多数时候无需通过网络或去其他服务器获取。
- 分布式闭环
单机闭环会遇到2个主要问题
-
数据不一致
例如没有主从架构导致不同服务器数据不一致
-
遇到存储瓶颈
磁盘或内存遇到天花板
解决数据不一致比较好的办法是采用主从或分布式集中存储,而遇到存储瓶颈就需要进行按业务键进行分片,将数据分散到多台服务器。
- 接入网关
接入网关又叫接入层,即接收流量的入口,在入口处做如下事情:
OpenResty环境搭建
安装前准备,必须安装perl
、libpcre
、libssl
库。
# 从系统路径中查看必备库是否已经安装
$ sudo ldconfig -v
# 安装必备库
$ sudo apt install libpcre3-dev libssl-dev perl make build-essential curl libreadline-dev libncurses5-dev
下载并解压OpenResty后进入其目录
$ wget https://openresty.org/download/ngx_openresty-1.13.6.1.tar.gz
$ tar -zxvf ngx_openresty-1.13.6.1.tar.gz
$ mv openresty-1.13.6.1 openresty
$ cd openresty
$ ./configure
默认会被安装到/usr/local/openresty
目录下
# 编译并安装
$ sudo make && make install
$ cd /usr/local/openresty
启动Nginx
$ sudo /usr/local/openresty/nginx/sbin/nginx
$ ps -ef | grep nginx
$ service nginx status
Nginx启动若出现
nginx: [emerg] bind() to 0.0.0.0:80 failed (98: Address already in use)
nginx: [emerg] still could not bind()
说明80端口并占用,查看80端口被占用的端口并重启。原因在于nginx先监听了ipv4的80端口之后又监听了ipv6的80端口,于是就重复占用了。
$ sudo netstat -ntlp | grep 80
$ sudo killall -9 nginx
重新编辑Nginx配置文件
$ sudo vim /etc/nginx/conf/nginx.conf
listen 80;
listen [::]:80 ipv6only=on default_server;
使用curl
工具或在浏览器访问默认80端口
$ curl 127.0.0.1
浏览器输入http://127.0.0.1/
将Nginx工具配置到当前用户的系统环境变量中
$ sudo vim ~/.bashrc
export PATH=$PATH:/usr/local/openresty/nginx/sbin
$ source ~./bashrc
$ cd ~
$ nginx -s reload
nginx: [alert] kill(12267, 1) failed (1: Operation not permitted)
开发文档
ubuntu 安装 vcode 或 sublime text 编辑器
content_by_lua
$ vim /usr/local/openresty/nginx/conf/nginx.conf
location /test {
default_type text/html;
content_by_lua ‘ngx.say("hello openresty")‘;
}
# 重启Nginx
$ /usr/local/openresty/nginx/sbin/nginx -s reload
# 浏览器访问 127.0.0.1/test
content_by_lua_file
$ vim nginx.conf
location /test {
content_by_lua_file ‘html/test.lua‘;
}
$ vim ../html/test.lua
ngx.say("hello lua")
$ sudo /usr/local/nginx/sbin/nginx -s reload
$ curl 127.0.0.1/test
hello lua
为避免每次修改都需要重启Nginx,可在Nginx的server
选项中配置lua_code_cache
选项。
$ vim nginx.conf
server{
lua_code_cache off;
location /test{
content_by_lua_file ‘html/test.lua‘;
}
}
$ sudo /usr/local/openresty/nginx/sbin/nginx -s reload
nginx: [alert] lua_code_cache is off; this will hurt performance in /usr/local/openresty/nginx/conf/nginx.conf:48
注意lua_code_cache off;
是会引擎Nginx的性能的,在生产环境中是需要将其开启的。
小节
在OpenResty中开发是分为两步的,第一步是修改Nginx配置,第二步是使用Lua开发自己的脚本。
OpenResty入门
参考资料
创建工作目录
OpenResty安装之后就有配置文件及相关目录,为了工作目录和安装目录互不干扰,另外创建OpenResty工作目录,并另写配置。
$ mkdir -p ~/openresty/test/logs ~/openresty/test/conf
$ vim ~/openresty/test/conf/nginx.conf
# 设置Nginx worker工作进程数量,即CPU核数。
worker_processes 1;
# 设置错误日志文件路径
error_log logs/error.log;
# 配置Nginx服务器与用户的网络连接
events{
# 设置每个工作进程的最大连接数
worker_connections 10224;
}
http{
# 虚拟机主机块定义
server{
# 监听端口
listen 8001;
# 配置请求的路由
location /{
default_type text/html;
content_by_lua_block{
ngx.say("hello world");
}
}
}
}
$ nginx -p ~/openresty/test
$ curl 127.0.0.1:8001
hello world
$ vim nginx.conf
location /test{
content_by_lua_file "lua/test.lua";
}
$ cd .. && mkdir lua && cd lua
$ vim test.lua
local args = ngx.req.get_uri_args()
local salt = args.salt
if not salt then
ngx.exit(ngx.HTTP_BAD_REQUEST)
end
local md5str = ngx.md5(ngx.time()..salt)
ngx.say(md5str)
$ sudo /usr/local/openresty/nginx/sbin/nginx -s reload
$ curl -i 127.0.0.1/test?salt=lua
HTTP/1.1 200 OK
Server: openresty/1.13.6.2
Date: Sun, 27 Jan 2019 10:07:17 GMT
Content-Type: application/octet-stream
Transfer-Encoding: chunked
Connection: keep-alive
b55b77f75e46b96b11778ca7edfe8d55
若代码中出现错误则需要直接查看Nginx的错误日志进行查看
$ vim nginx/logs/error.log
2019/01/27 17:37:15 [error] 15764#0: *6 failed to load external Lua file "/usr/local/openresty/nginx/test.lua": cannot open /usr/local/openresty/nginx/test.lua: No such file or...
Windows系统下查看Nginx进程
λ tasklist /fi "imagename eq nginx.exe"
映像名称 PID 会话名 会话# 内存使用
========================= ======== ================ =========== ============
nginx.exe 9072 Console 1 7,840 K
nginx.exe 7692 Console 1 12,304 K
nginx.exe 8120 Console 1 7,840 K
nginx.exe 4552 Console 1 12,188 K
nginx.exe 9588 Console 1 7,828 K
nginx.exe 6256 Console 1 12,216 K
nginx.exe 7308 Console 1 7,828 K
nginx.exe 10192 Console 1 12,212 K
λ taskkill /im nginx.exe /f
成功: 已终止进程 "nginx.exe",其 PID 为 9072。
ngx lua API
参考资料
以上是关于Linux高性能服务器编程I/O复用的高级应用的主要内容,如果未能解决你的问题,请参考以下文章