一起来写web server 06 -- 单线程非阻塞IO版本
Posted lishuhuakai
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了一起来写web server 06 -- 单线程非阻塞IO版本相关的知识,希望对你有一定的参考价值。
阻塞IO
的效率是在是低下,你如果要写高性能的web server
的话,你必须使用非阻塞IO
.
非阻塞IO的读写
在谈到非阻塞IO之前,必须先谈一谈阻塞IO,在网络编程中,我们假设有一个监听套接字的sockfd
,这个sockfd
是你调用了socket
,listen
, bind
这一大票的函数得到的一个文件描述符.其实它默认就是阻塞的,具体的表现是:
使用
accept
函数监听sockfd
时,如果没有连接到来,这个函数会一直阻塞在那里.对
sockfd
调用recv
函数的时候,如果对方还没有发送数据过来,这个函数也会一直阻塞.对
sockfd
执行write
操作的时候,如果tcp
缓冲区已经满了,那么write
函数也会阻塞在那里,一直到数据写完了才返回.......
非阻塞的IO确实没有什么不好的,编程简单,逻辑清晰,但是在今天的话,有一个致命的缺点,那就是资源的利用率不高,具体来说,就是通过阻塞IO编写的服务端程序只有很少的时间在工作,大部分时间都处在阻塞状态.
为了提高程序的性能,我们应该充分利用线程阻塞的那段时间,毕竟那么长的时间里,我们可以干很多的事情,所以聪明的程序员弄出了个非阻塞IO的玩意,具体而言,就是一旦对一个文件描述符设置了非阻塞,比如说前面的sockfd
,我们调用accept
, write
, recv
等函数的时候,如果碰到数据没到,缓冲区已满这类事情,不会像之前那样阻塞在那里,这些函数会立即返回,并且设置一些标志让调用者知道.
单纯的非阻塞IO其实并没有多么大的用处,假如你要写向sockfd
写入数据,如果sockfd
是非阻塞的话,你大概会这样编程:
while (true) {
ret = write(sockfd, buf, sizeof(buf));
if 数据写完了
break;
if 资源不可用 or 还有剩余数据没写完
continue;
}
如果我们用上面的方法来编写代码的话,还不如用直接用阻塞IO
呢,轮询可比阻塞傻逼多了.
非阻塞IO加上epoll
, select
, poll
这些IO多路复用机制,我们才可以高效地利用以前被阻塞的时光.高效确实非常高效,不过我想说的一点是 –
非阻塞IO加上这些IO复用机制,会使得代码的复杂度急剧上升.如果你用这些机制来编写网络程序的话,更加要小心,因为这些东西调试起来并不是那么方便,代码很可能死在一个微小而不易察觉的点上.
我顺带扯一下这些东西复杂在哪里:
首先,由于监听套接字的文件描述符
listenfd
变得非阻塞,所以你要监听这个描述符上的可读事件,当然,这很简单.如果连接到来了,新建连接,你要设置新来的连接的文件描述符的监听,非阻塞
IO
就是这么个事情,拉弓没有回头箭,一旦用了,几乎所有的套接字的描述符都要设置为非阻塞.这也不难.一旦对方发送了数据,你要读取,像在
epoll
里面,为了提升效率,我们一般都会设置ET
模式,这意味着你要一次性读完fd
上的数据.不能说你想读多少就读多少,边读边处理,这样的话,你或许不得不用一个buf
来缓存读到的数据,并且记录你已经处理了的数据的数目.系统的
tcp缓冲区
很容易就填满了,因为你的IO
是非阻塞的,所以一旦发生了缓冲区满这种事情,你不大可能等待到缓冲区可用,所以你也要监听这个套接字的可写事件,并且为了保存数据,你要自己弄一个buf
,要记录下来已经写了多少,还有多少数据要写,下次写的时候可以从上次中断的地方开始.如果
epoll
,非阻塞IO
碰上了多线程,复杂度还会上升.因为我们还必须处理一些竞争条件.
非阻塞IO的核心思想是避免阻塞在
read
或write
或其他IO系统调用上,这样可以最大限度地复用thread-of-control
,让一个线程能够服务于多个socket
连接,IO
线程只能阻塞在IO multiplexing
函数上,如select
/poll
/epoll_wait
.这样一来,应用层的缓冲是必需的,每个TCP socket
都要有stateful
的input buffer
和output buffer
. – linux多线程服务端编程
从前面的分析可以看得到,为了提高效率,我们真的牺牲了很多.要做的工作也增加了很多.当然,我可不想讲什么epoll
,其实我更想谈的是非阻塞IO
在write
, read
这些函数里的一些表现.
write函数
首先是write
函数,write
函数如果返回的值是大于等于0的话,那表示的是已经写入的字节数目.返回-1
的话,一般会设置errno
,通过判别errno
我们可以知道到底是因为什么出错.
read函数
read
也没有什么好说的.
writev函数
好吧,我写这一节,其实只是想吐槽一下writev
函数的,因为我被它坑到了.如果你用这个函数来处理非阻塞的文件描述符,应该会感觉这个玩意简直和鸡肋一毛一样.
man手册里说,它的行为和write
差不多:
The writev() system call works just like write(2) except that multiple buffers are written out.
不过writev
是分散写,也就是你的数据可以这里一块,那里一块,然后只要将这些数据的首地址,长度什么的写到一个iovc
的结构体数组里,传递给writev
,writev
就帮你来写这些数据,在你看来,分散的数据就像是连成了一体.
对于阻塞IO
,这个函数应该是很好用的,对于非阻塞IO,你如果想用的话,要做的工作估计还很多,我们假想一下,如果writev
返回一个大于0
的值num
,这个值又小于所有要传递的文件块的总长度,这意味着什么,意味着数据还没有写完啊.如果你还想写的话,你下一次调用writev
的时候要重新整理iovc
数组,这坑爹呢.
首先,你要一块一块比对大小,确定已经写了多少块数据,然后对于那个写了一点的块,要将iovc[0].iov_base
指向下一个开始的字节…,好吧,听起来就烦,好吧,你看到了,其实还不如直接用write
呢.
我的代码
变化颇多的代码
为了使用IO多路复用机制,这次的代码在之前的代码上做了很多的修改,昨晚之后,为了使代码效率更高,我使用了Cache
技术,具体而言,就是说,每次加载了文件,不是用完了马上就卸载掉,而是暂时保存起来,这样可以大大加快程序的处理速度.我并没有一上来就搞起多线程,如果单线程都跑出错的话,多线程就更不要谈了.
主函数
主函数是一个典型的epoll
形式的写法,如果连接到来,立马处理连接,有数据可读立马去读,如果缓冲区可用,则立马去写.值得一提的是,这里使用的是epoll
的LT
模式,这个模式有一个特点就是,如果对方发送的数据你没有读完的话,它会一直触发,或者说,如果系统的TCP
缓冲区可用了,而你没有理会的话,它也会一直触发.
int main(int argc, char *argv[])
{
int listenfd = Open_listenfd(8080); /* 8080号端口监听 */
epoll_event events[MAXEVENTNUM];
sockaddr clnaddr;
socklen_t clnlen = sizeof(clnaddr);
addsig(SIGPIPE, SIG_IGN);
int epollfd = Epoll_create(80); /* 10基本上没有什么用处 */
addfd(epollfd, listenfd, false); /* epollfd要监听listenfd上的可读事件 */
HttpHandle handle[256];
int acnt = 0;
for ( ; ;) {
int eventnum = Epoll_wait(epollfd, events, MAXEVENTNUM, -1);
for (int i = 0; i < eventnum; ++i) {
int sockfd = events[i].data.fd;
if (sockfd == listenfd) { /* 有连接到来 */
printf("%d\n", ++acnt);
int connfd = Accept(listenfd, &clnaddr, &clnlen);
handle[connfd].init(connfd); /* 初始化 */
addfd(epollfd, connfd, false); /* 加入监听 */
}
else if (events[i].events & EPOLLIN) { /* 有数据可读 */
int res = handle[sockfd].processRead(); /* 处理读事件 */
if (res == STATUS_WRITE) /* 我们需要监听写事件 */
modfd(epollfd, sockfd, EPOLLOUT);
else
removefd(epollfd, sockfd);
}
else if (events[i].events & EPOLLOUT) { /* 如果可写了 */
printf("Could write!\n");
int res = handle[sockfd].processWrite(); /* 处理写事件 */
if (res == STATUS_READ) /* 对方发送了keepalive */
modfd(epollfd, sockfd, EPOLLIN);
else
removefd(epollfd, sockfd);
}
}
}
return 0;
}
将http处理代码封装起来
为了管理起来更加方便,我将http
处理的代码封装到了一个HttpHandle
的类之中.
我们一起来看一下这个类:
class HttpHandle : public noncopyable /* 不可以被拷贝,不可以被复制 */
{
public:
static const int READ_BUFFER_SIZE = 1024; /* 读缓冲区的大小 */
static const int WRITE_BUFFER_SIZE = 1024; /* 写缓冲区的大小 */
private:
static Cache& cache_; /* 全局只需要一个cache_*/
/* 该HTTP连接的socket和对方的socket地址 */
int sockfd_;
boost::shared_ptr<FileInfo> fileInfo_;
/* 读缓冲区 */
char readBuf_[READ_BUFFER_SIZE];
/* 标志读缓冲区中已经读入的客户数据的最后一个字节的下一个位置 */
int nRead_;
/* 当前正在分析的字符在读缓冲区中的位置 */
int nChecked_;
bool keepAlive_; /* 是否保持连接 */
bool sendFile_; /* 是否发送文件 */
/* 写缓冲区 */
char writeBuf_[WRITE_BUFFER_SIZE];
/* 写缓冲区中待发送的字节数 */
int nStored_;
/* 已经写了多少字节 */
int written_;
}
正如我前面所说的,一旦使用非阻塞IO
,代码的复杂度就上来了,我们必须考虑配备读写缓冲区.sockfd_
用于记录与客户端连接的socket
描述符.fileInfo_
记录了要发送的文件的一些信息.
当然,我们还要记录读到哪里啦,写到哪里啦,所以就有了nRead_
,nChecked_
,nStored_
,written_
在这些变量.
好吧,我们来看一些代码:
bool HttpHandle::read()
{
/* 我们尽量一次性将数据全部读尽 */
nRead_ = 0; /* 首先要清零 */
nChecked_ = 0;
if (nRead_ >= READ_BUFFER_SIZE) {
return false;
}
int byte_read = 0;
while (true) {
byte_read = recv(sockfd_, readBuf_ + nRead_, READ_BUFFER_SIZE - nRead_, 0);
if (byte_read == -1) { /* 代表出错了 */
break;
}
else if (byte_read == 0) { /* 对方已经关闭了连接 */
return false;
}
nRead_ += byte_read; /* 已经读取的字节 */
}
return true;
}
这是read
函数,需要一次性将对方发来的数据读尽.当然,你也可以不读完,那么epoll
机制仍然会提醒你可读.不过这样编码起来,代码会复杂很多.
int HttpHandle::processRead()
{
int is_static;
struct stat sbuf;
char buf[MAXLINE], method[MAXLINE], uri[MAXLINE], version[MAXLINE];
char filename[MAXLINE], cgiargs[MAXLINE];
char line[MAXLINE];
if (false == read()) { /* 对方已经关闭了连接 */
return STATUS_CLOSE;
}
/* 接下来开始解析读入的数据 */
getLine(line, MAXLINE); /* 读取一行数据 */
/* 使用sscanf函数确实是一个非常棒的办法! */
sscanf(line, "%s %s %s", method, uri, version);
if (strcasecmp(method, "GET")) {
clienterror(method, "501", "Not Implemented",
"Tiny does not implement this method");
goto end;
}
readRequestHdrs(); /* 处理剩余的请求头部 */
/* Parse URI from GET request */
is_static = parseUri(uri, filename, cgiargs);
if (stat(filename, &sbuf) < 0) {
clienterror(filename, "404", "Not found",
"Tiny couldn‘t find this file"); /* 没有找到文件 */
goto end;
}
if (is_static) { /* Serve static content */
if (!(S_ISREG(sbuf.st_mode)) || !(S_IRUSR & sbuf.st_mode)) {
clienterror(filename, "403", "Forbidden",
"Tiny couldn‘t read the file"); /* 权限不够 */
goto end;
}
serveStatic(filename, sbuf.st_size);
}
else { /* Serve dynamic content */
clienterror(method, "501", "Not Implemented",
"Tiny does not implement this method");
goto end;
}
end:
return processWrite();
}
processRead
函数和之前版本的doit
基本上是类似的,这里就不再详细展开.
与之前最大的不同是,下面的这几个函数:
void HttpHandle::clienterror(char *cause, char *errnum, char *shortmsg, char *longmsg)
{
char buf[MAXLINE], body[MAXBUF];
/* Build the HTTP response body */
sprintf(body, "<html><title>Tiny Error</title>");
sprintf(body, "%s<body bgcolor=""ffffff"">\r\n", body);
sprintf(body, "%s%s: %s\r\n", body, errnum, shortmsg);
sprintf(body, "%s<p>%s: %s\r\n", body, longmsg, cause);
sprintf(body, "%s<hr><em>The Tiny Web server</em>\r\n", body);
/* Print the HTTP response */
addResponse("HTTP/1.0 %s %s\r\n", errnum, shortmsg);
addResponse("Content-type: text/html\r\n");
addResponse("Content-length: %d\r\n\r\n", (int)strlen(body));
addResponse("%s", body);
}
我们继续来查看addResponse
这个函数:
bool HttpHandle::addResponse(const char* format, ...)
{
if (nStored_ >= WRITE_BUFFER_SIZE) {
return false;
}
va_list arg_list;
va_start(arg_list, format);
int len = vsnprintf(writeBuf_ + nStored_, WRITE_BUFFER_SIZE - 1 - nStored_, format, arg_list); /* 将数据输入到writeBuf_中 */
if (len >= (WRITE_BUFFER_SIZE - 1 - nStored_)) {
return false;
}
nStored_ += len;
va_end(arg_list);
return true;
}
我们写的时候并不是直接发送给对方,而是先写入到本地的写缓冲区里面,这也很好理解,因为我们使用了非阻塞IO
,写的话,很有肯能一次性写不完,因为不会像以前的版本一样阻塞到写完为止,所以我们必须将数据保存下来,这次写不完,下次接着写.
在前面的processRead
函数的最后调用了processWrite
函数,它的实现如下:
int HttpHandle::processWrite()
{
int res;
/*-
* 数据要作为两部分发送,第1步,要发送writeBuf_里面的数据.
*/
int nRemain = strlen(writeBuf_) - written_; /* writeBuf_中还有多少字节要写 */
if (nRemain > 0) {
while (true) {
nRemain = strlen(writeBuf_) - written_;
res = write(sockfd_, writeBuf_ + written_, nRemain);
if (res < 0) {
if (errno == EAGAIN) { /* 资源暂时不可用 */
return STATUS_WRITE;
}
return STATUS_ERROR;
}
written_ += res;
if (written_ == strlen(writeBuf_))
break;
}
}
/*-
* 第2步,要发送html网页数据.
*/
if (sendFile_) {
int bytesToSend = fileInfo_->size_ + strlen(writeBuf_); /* 总共需要发送的字节数目 */
while (true) {
int offset = written_ - strlen(writeBuf_);
res = write(sockfd_, (char *)fileInfo_->addr_ + offset, fileInfo_->size_ - offset);
if (res < 0) {
if (errno == EAGAIN) { /* 资源暂时不可用 */
return STATUS_WRITE;
}
return STATUS_ERROR;
}
written_ += res;
if (written_ == bytesToSend)
break;
}
}
/* 数据发送完毕 */
reset();
if (keepAlive_) /* 如果需要保持连接的话 */
return STATUS_READ;
else
return STATUS_SUCCESS;
}
数据发送可不是一件容易的事情,我的实现是这样的,前面构造的头部信息保存在writeBuf_
中,文件的信息保存在fileInfo_
这个结构中,所以要发送两次,第一次发送头部信息,第二次再发送文件信息.
在发送的过程中要考虑各种情况,如果系统的TCP
的缓冲区已经满了,不能接收我们的数据了,我们要退出这次发送.否则的话,就一直发送,等到全部数据发送完毕为止.
你可能会感到奇怪,为什么我要分两次发送,为什么不直接将文件的信息写入到writeBuf_
之中?这是因为效率的考虑,我们要尽量消除不必要的拷贝,如果要实现高性能的服务器的话.我们可以想象得到,有一些页面会特别受到用户的追捧,访问量会特别大,如果我们每一次都将这个页面从服务器加载,发送完了之后再关闭,这样会浪费我们多少cpu
资源,为了更快的访问速度,我们的这个web
服务器设计了一个Cache
机制,将每次发送的页面缓存起来,下一次要用到这个页面的时候,直接用就可以了,不用加载.这样服务器的效率就变得高得多了.
Cache
机制
为了保存文件的信息,我们设计了一个FileInfo
类.
class FileInfo : noncopyable
{
public:
FileInfo(std::string& fileName, int fileSize) {
int srcfd = Open(fileName.c_str(), O_RDONLY, 0); /* 打开文件 */
size_ = fileSize;
addr_ = Mmap(0, fileSize, PROT_READ, MAP_PRIVATE, srcfd, 0);
Close(srcfd); /* 关闭文件 */
}
~FileInfo() {
Munmap(addr_, size_); /* 解除映射 */
}
public:
void *addr_; /* 地址信息 */
int size_; /* 文件大小 */
};
这个类非常简单,具体而言,就是在构建类的时候,自动映射文件,类析构的时候实现解除映射.
为了管理FileInfo
,我们设计了一个Cache
类.我们来看一看吧.
class Cache : noncopyable
{
typedef std::map<std::string, boost::shared_ptr<FileInfo>>::iterator it;
private:
std::map<std::string, boost::shared_ptr<FileInfo>> cache_; /* 实现文件名称到地址的一个映射 */
static const size_t MAX_CACHE_SIZE = 100; /* 最多缓存100个文件 */
...
}
这个Cache
类中其实就是记录了一个文件名到文件地址的映射关系.它有一个重要的查找函数getFileAddr
:
boost::shared_ptr<FileInfo> getFileAddr(std::string fileName, int fileSize) {
if (cache_.end() != cache_.find(fileName)) { /* 如果在cache中找到了 */
return cache_[fileName];
}
if (cache_.size() >= MAX_CACHE_SIZE) { /* 文件数目过多,需要删除一个元素 */
cache_.erase(cache_.begin()); /* 直接移除掉最前一个元素 */
}
/* 没有找到的话,我们需要加载文件 */
boost::shared_ptr<FileInfo> fileInfo(new FileInfo(fileName, fileSize));
cache_[fileName] = fileInfo;
return fileInfo;
}
思想很简单,那就是如果找到了,立马返回这个地址,如果缓存的文件过多,那么要删除一个元素,我这里实现的是最简单的,直接删除最前的一个元素,你也可以实现更加高效的算法.
如果没有找到的话,要重新从磁盘上加载这个文件.我们使用shared_ptr
来管理资源,这种类型的指针的好处在于,只要你持有这种指针,那么指针指向的对象便不会被析构掉,恰好符合我们的要求.
我们继续来看HttpHandle
类的serveStatic
函数:
void HttpHandle::serveStatic(char *fileName, int fileSize)
{ /* 用于处理静态的网页 */
int srcfd;
char fileType[MAXLINE], buf[MAXBUF];
/* 构造头部信息 */
getFileType(fileName, fileType);
sprintf(buf, "HTTP/1.0 200 OK\r\n");
sprintf(buf, "%sServer: Tiny Web Server\r\n", buf);
sprintf(buf, "%sContent-length: %d\r\n", buf, fileSize);
sprintf(buf, "%sContent-type: %s\r\n\r\n", buf, fileType);
addResponse(buf);
fileInfo_ = cache_.getFileAddr(fileName, fileSize); /* 添加文件 */
sendFile_ = true;
}
这个函数在这个时候就应该变得很简单了,就是构造头部信息,获得文件信息而已.
状态机
非阻塞IO
的引入使得代码变得复杂起来,我们不能用之前的阻塞IO
的思想来解决现在的问题了,网页的处理其实已经变成了一个状态机了,首先是对方来了连接,我们处理这个连接的HttpHandle
就要初始化,对方发来的数据,我们处理这个连接的HttpHandle
立马转为读取状态,读取完成后立马转入发送状态,发送完成之后要清理资源.
下一次连接依旧如此.
缺陷
这个版本的web server
并发度有所提高,但是还存在不少的缺陷,代码也不是很漂亮,接下来版本的迭代中,我们将逐步解决这个问题.
我来讲一讲缺陷在哪里:
- 代码假设对方总是善意的,总是一次性可以接收到所有的
request
信息,这是不现实的,虽然大部分时候是如此,但是路走多了,总会碰到鬼,如果对方第一次发送了这样的信息:
GET /
然后停顿了30
秒,才发送接下来的:
Http/1.1\r\n...
我上面的代码势必会出错,因为对于客户端发送的消息,我只读了一次,也就是说上面的read
函数只会读到GET /
信息,接下来的代码中就parse
这些信息,发送回复.很显然,会出错.
如果对方发送了
request
,立马关闭了连接,我们向对方发送数据,这个时候根据UNP
的描述,发送第二次的时候,会引发SIGPIPE
消息,它的默认行为是关闭程序,我们是否做到了忽略这个SIGPIPE
?对方老是要求
keepAlive
,一直连着不放,企图耗尽服务器资源怎么办?
类似的问题需要考量的东西还有很多,写一个完备的服务器可真不是一件容易的事情.
具体代码还是看这里吧!:https://github.com/lishuhuakai/Spweb
1
以上是关于一起来写web server 06 -- 单线程非阻塞IO版本的主要内容,如果未能解决你的问题,请参考以下文章