一起来写web server 06 -- 单线程非阻塞IO版本

Posted lishuhuakai

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了一起来写web server 06 -- 单线程非阻塞IO版本相关的知识,希望对你有一定的参考价值。


阻塞IO的效率是在是低下,你如果要写高性能的web server的话,你必须使用非阻塞IO.

非阻塞IO的读写

在谈到非阻塞IO之前,必须先谈一谈阻塞IO,在网络编程中,我们假设有一个监听套接字的sockfd,这个sockfd是你调用了socket,listen, bind这一大票的函数得到的一个文件描述符.其实它默认就是阻塞的,具体的表现是:

  1. 使用accept函数监听sockfd时,如果没有连接到来,这个函数会一直阻塞在那里.

  2. sockfd调用recv函数的时候,如果对方还没有发送数据过来,这个函数也会一直阻塞.

  3. sockfd执行write操作的时候,如果tcp缓冲区已经满了,那么write函数也会阻塞在那里,一直到数据写完了才返回.

  4. ......

非阻塞的IO确实没有什么不好的,编程简单,逻辑清晰,但是在今天的话,有一个致命的缺点,那就是资源的利用率不高,具体来说,就是通过阻塞IO编写的服务端程序只有很少的时间在工作,大部分时间都处在阻塞状态.

为了提高程序的性能,我们应该充分利用线程阻塞的那段时间,毕竟那么长的时间里,我们可以干很多的事情,所以聪明的程序员弄出了个非阻塞IO的玩意,具体而言,就是一旦对一个文件描述符设置了非阻塞,比如说前面的sockfd,我们调用accept, write, recv等函数的时候,如果碰到数据没到,缓冲区已满这类事情,不会像之前那样阻塞在那里,这些函数会立即返回,并且设置一些标志让调用者知道.

单纯的非阻塞IO其实并没有多么大的用处,假如你要写向sockfd写入数据,如果sockfd是非阻塞的话,你大概会这样编程:

while (true) {
  ret = write(sockfd, buf, sizeof(buf));
  if 数据写完了
      break;
  if 资源不可用 or 还有剩余数据没写完
    continue;
}

如果我们用上面的方法来编写代码的话,还不如用直接用阻塞IO呢,轮询可比阻塞傻逼多了.

非阻塞IO加上epoll, select, poll这些IO多路复用机制,我们才可以高效地利用以前被阻塞的时光.高效确实非常高效,不过我想说的一点是 –

非阻塞IO加上这些IO复用机制,会使得代码的复杂度急剧上升.如果你用这些机制来编写网络程序的话,更加要小心,因为这些东西调试起来并不是那么方便,代码很可能死在一个微小而不易察觉的点上.

我顺带扯一下这些东西复杂在哪里:

  1. 首先,由于监听套接字的文件描述符listenfd变得非阻塞,所以你要监听这个描述符上的可读事件,当然,这很简单.

  2. 如果连接到来了,新建连接,你要设置新来的连接的文件描述符的监听,非阻塞IO就是这么个事情,拉弓没有回头箭,一旦用了,几乎所有的套接字的描述符都要设置为非阻塞.这也不难.

  3. 一旦对方发送了数据,你要读取,像在epoll里面,为了提升效率,我们一般都会设置ET模式,这意味着你要一次性读完fd上的数据.不能说你想读多少就读多少,边读边处理,这样的话,你或许不得不用一个buf来缓存读到的数据,并且记录你已经处理了的数据的数目.

  4. 系统的tcp缓冲区很容易就填满了,因为你的IO是非阻塞的,所以一旦发生了缓冲区满这种事情,你不大可能等待到缓冲区可用,所以你也要监听这个套接字的可写事件,并且为了保存数据,你要自己弄一个buf,要记录下来已经写了多少,还有多少数据要写,下次写的时候可以从上次中断的地方开始.

  5. 如果epoll,非阻塞IO碰上了多线程,复杂度还会上升.因为我们还必须处理一些竞争条件.

非阻塞IO的核心思想是避免阻塞在readwrite或其他IO系统调用上,这样可以最大限度地复用thread-of-control,让一个线程能够服务于多个socket连接,IO线程只能阻塞在IO multiplexing函数上,如select/poll/epoll_wait.这样一来,应用层的缓冲是必需的,每个TCP socket都要有statefulinput bufferoutput buffer. – linux多线程服务端编程

从前面的分析可以看得到,为了提高效率,我们真的牺牲了很多.要做的工作也增加了很多.当然,我可不想讲什么epoll,其实我更想谈的是非阻塞IOwrite, read这些函数里的一些表现.

write函数

首先是write函数,write函数如果返回的值是大于等于0的话,那表示的是已经写入的字节数目.返回-1的话,一般会设置errno,通过判别errno我们可以知道到底是因为什么出错.

read函数

read也没有什么好说的.

writev函数

好吧,我写这一节,其实只是想吐槽一下writev函数的,因为我被它坑到了.如果你用这个函数来处理非阻塞的文件描述符,应该会感觉这个玩意简直和鸡肋一毛一样.

man手册里说,它的行为和write差不多:

The writev() system call works just like write(2) except that multiple buffers are written out.

不过writev是分散写,也就是你的数据可以这里一块,那里一块,然后只要将这些数据的首地址,长度什么的写到一个iovc的结构体数组里,传递给writev,writev就帮你来写这些数据,在你看来,分散的数据就像是连成了一体.

对于阻塞IO,这个函数应该是很好用的,对于非阻塞IO,你如果想用的话,要做的工作估计还很多,我们假想一下,如果writev返回一个大于0的值num,这个值又小于所有要传递的文件块的总长度,这意味着什么,意味着数据还没有写完啊.如果你还想写的话,你下一次调用writev的时候要重新整理iovc数组,这坑爹呢.

首先,你要一块一块比对大小,确定已经写了多少块数据,然后对于那个写了一点的块,要将iovc[0].iov_base指向下一个开始的字节…,好吧,听起来就烦,好吧,你看到了,其实还不如直接用write呢.

我的代码

变化颇多的代码

为了使用IO多路复用机制,这次的代码在之前的代码上做了很多的修改,昨晚之后,为了使代码效率更高,我使用了Cache技术,具体而言,就是说,每次加载了文件,不是用完了马上就卸载掉,而是暂时保存起来,这样可以大大加快程序的处理速度.我并没有一上来就搞起多线程,如果单线程都跑出错的话,多线程就更不要谈了.

主函数

主函数是一个典型的epoll形式的写法,如果连接到来,立马处理连接,有数据可读立马去读,如果缓冲区可用,则立马去写.值得一提的是,这里使用的是epollLT模式,这个模式有一个特点就是,如果对方发送的数据你没有读完的话,它会一直触发,或者说,如果系统的TCP缓冲区可用了,而你没有理会的话,它也会一直触发.

int main(int argc, char *argv[])
{
    int listenfd = Open_listenfd(8080); /* 8080号端口监听 */
    epoll_event events[MAXEVENTNUM];
    sockaddr clnaddr;
    socklen_t clnlen = sizeof(clnaddr);

    addsig(SIGPIPE, SIG_IGN);

    int epollfd = Epoll_create(80); /* 10基本上没有什么用处 */
    addfd(epollfd, listenfd, false); /* epollfd要监听listenfd上的可读事件 */

    HttpHandle handle[256];
    int acnt = 0;
    for ( ; ;) {
        int eventnum = Epoll_wait(epollfd, events, MAXEVENTNUM, -1);
        for (int i = 0; i < eventnum; ++i) {
            int sockfd = events[i].data.fd;
            if (sockfd == listenfd) { /* 有连接到来 */
                printf("%d\n", ++acnt);
                int connfd = Accept(listenfd, &clnaddr, &clnlen);
                handle[connfd].init(connfd); /* 初始化 */
                addfd(epollfd, connfd, false); /* 加入监听 */
            }
            else if (events[i].events & EPOLLIN) { /* 有数据可读 */
                int res = handle[sockfd].processRead(); /* 处理读事件 */
                if (res == STATUS_WRITE)  /* 我们需要监听写事件 */
                    modfd(epollfd, sockfd, EPOLLOUT);
                else 
                    removefd(epollfd, sockfd);
            }
            else if (events[i].events & EPOLLOUT) { /* 如果可写了 */
                printf("Could write!\n");
                int res = handle[sockfd].processWrite(); /* 处理写事件 */
                if (res == STATUS_READ) /* 对方发送了keepalive */
                    modfd(epollfd, sockfd, EPOLLIN);
                else
                    removefd(epollfd, sockfd);
            }
        }
    }
    return 0;
}

将http处理代码封装起来

为了管理起来更加方便,我将http处理的代码封装到了一个HttpHandle的类之中.

我们一起来看一下这个类:

class HttpHandle : public noncopyable /* 不可以被拷贝,不可以被复制 */
{
public:
    static const int READ_BUFFER_SIZE = 1024; /* 读缓冲区的大小 */
    static const int WRITE_BUFFER_SIZE = 1024; /* 写缓冲区的大小 */
private:
    static Cache& cache_; /* 全局只需要一个cache_*/
    /* 该HTTP连接的socket和对方的socket地址 */
    int sockfd_;
    boost::shared_ptr<FileInfo> fileInfo_;

    /* 读缓冲区 */
    char readBuf_[READ_BUFFER_SIZE];
    /* 标志读缓冲区中已经读入的客户数据的最后一个字节的下一个位置 */
    int nRead_;
    /* 当前正在分析的字符在读缓冲区中的位置 */
    int nChecked_;

    bool keepAlive_; /* 是否保持连接 */
    bool sendFile_; /* 是否发送文件 */

    /* 写缓冲区 */
    char writeBuf_[WRITE_BUFFER_SIZE];
    /* 写缓冲区中待发送的字节数 */
    int nStored_;
    /* 已经写了多少字节 */
    int written_; 
}

正如我前面所说的,一旦使用非阻塞IO,代码的复杂度就上来了,我们必须考虑配备读写缓冲区.sockfd_用于记录与客户端连接的socket描述符.fileInfo_记录了要发送的文件的一些信息.
当然,我们还要记录读到哪里啦,写到哪里啦,所以就有了nRead_,nChecked_,nStored_,written_在这些变量.

好吧,我们来看一些代码:

bool HttpHandle::read()
{
    /* 我们尽量一次性将数据全部读尽 */
    nRead_ = 0; /* 首先要清零 */
    nChecked_ = 0;
    if (nRead_ >= READ_BUFFER_SIZE) {
        return false;
    }
    int byte_read = 0;
    while (true) {
        byte_read = recv(sockfd_, readBuf_ + nRead_, READ_BUFFER_SIZE - nRead_, 0);
        if (byte_read == -1) {  /* 代表出错了 */
            break;
        }
        else if (byte_read == 0) { /* 对方已经关闭了连接 */
            return false;
        }
        nRead_ += byte_read; /* 已经读取的字节 */
    }
    return true;
}

这是read函数,需要一次性将对方发来的数据读尽.当然,你也可以不读完,那么epoll机制仍然会提醒你可读.不过这样编码起来,代码会复杂很多.

int HttpHandle::processRead()
{
    int is_static;
    struct stat sbuf;
    char buf[MAXLINE], method[MAXLINE], uri[MAXLINE], version[MAXLINE];
    char filename[MAXLINE], cgiargs[MAXLINE];
    char line[MAXLINE];

    if (false == read()) { /* 对方已经关闭了连接 */
        return STATUS_CLOSE;
    }

    /* 接下来开始解析读入的数据 */
    getLine(line, MAXLINE); /* 读取一行数据 */
    /* 使用sscanf函数确实是一个非常棒的办法! */
    sscanf(line, "%s %s %s", method, uri, version);      
    if (strcasecmp(method, "GET")) {               
        clienterror(method, "501", "Not Implemented",
            "Tiny does not implement this method");
        goto end;
    }
    readRequestHdrs();  /* 处理剩余的请求头部 */
                                                     /* Parse URI from GET request */
    is_static = parseUri(uri, filename, cgiargs);
    if (stat(filename, &sbuf) < 0) { 
        clienterror(filename, "404", "Not found",
            "Tiny couldn‘t find this file"); /* 没有找到文件 */
        goto end;
    }                                                   

    if (is_static) { /* Serve static content */
        if (!(S_ISREG(sbuf.st_mode)) || !(S_IRUSR & sbuf.st_mode)) {
            clienterror(filename, "403", "Forbidden",
                "Tiny couldn‘t read the file"); /* 权限不够 */
            goto end;
        }
        serveStatic(filename, sbuf.st_size);       
    }
    else { /* Serve dynamic content */
        clienterror(method, "501", "Not Implemented",
            "Tiny does not implement this method");
        goto end;
    }
    end:
    return processWrite();
}

processRead函数和之前版本的doit基本上是类似的,这里就不再详细展开.
与之前最大的不同是,下面的这几个函数:

void HttpHandle::clienterror(char *cause, char *errnum, char *shortmsg, char *longmsg)
{
    char buf[MAXLINE], body[MAXBUF];

    /* Build the HTTP response body */
    sprintf(body, "<html><title>Tiny Error</title>");
    sprintf(body, "%s<body bgcolor=""ffffff"">\r\n", body);
    sprintf(body, "%s%s: %s\r\n", body, errnum, shortmsg);
    sprintf(body, "%s<p>%s: %s\r\n", body, longmsg, cause);
    sprintf(body, "%s<hr><em>The Tiny Web server</em>\r\n", body);

    /* Print the HTTP response */
    addResponse("HTTP/1.0 %s %s\r\n", errnum, shortmsg);
    addResponse("Content-type: text/html\r\n");
    addResponse("Content-length: %d\r\n\r\n", (int)strlen(body));
    addResponse("%s", body);
}

我们继续来查看addResponse这个函数:

bool HttpHandle::addResponse(const char* format, ...)
{
    if (nStored_ >= WRITE_BUFFER_SIZE) {
        return false;
    }
    va_list arg_list;
    va_start(arg_list, format);
    int len = vsnprintf(writeBuf_ + nStored_, WRITE_BUFFER_SIZE - 1 - nStored_, format, arg_list); /* 将数据输入到writeBuf_中 */
    if (len >= (WRITE_BUFFER_SIZE - 1 - nStored_)) {
        return false;
    }
    nStored_ += len;
    va_end(arg_list);
    return true;
}

我们写的时候并不是直接发送给对方,而是先写入到本地的写缓冲区里面,这也很好理解,因为我们使用了非阻塞IO,写的话,很有肯能一次性写不完,因为不会像以前的版本一样阻塞到写完为止,所以我们必须将数据保存下来,这次写不完,下次接着写.

在前面的processRead函数的最后调用了processWrite函数,它的实现如下:

int HttpHandle::processWrite()
{
    int res;
    /*- 
    * 数据要作为两部分发送,第1步,要发送writeBuf_里面的数据.
    */
    int nRemain = strlen(writeBuf_) - written_; /* writeBuf_中还有多少字节要写 */
    if (nRemain > 0) {
        while (true) {
            nRemain = strlen(writeBuf_) - written_;
            res = write(sockfd_, writeBuf_ + written_, nRemain);
            if (res < 0) {
                if (errno == EAGAIN) { /* 资源暂时不可用 */
                    return STATUS_WRITE;
                }
                return STATUS_ERROR;
            }
            written_ += res;
            if (written_ == strlen(writeBuf_))
                break;
        }
    }

    /*-
    * 第2步,要发送html网页数据.
    */
    if (sendFile_) {
        int bytesToSend = fileInfo_->size_ + strlen(writeBuf_); /* 总共需要发送的字节数目 */
        while (true) {
            int offset = written_ - strlen(writeBuf_);
            res = write(sockfd_, (char *)fileInfo_->addr_ + offset, fileInfo_->size_ - offset);
            if (res < 0) {
                if (errno == EAGAIN) { /* 资源暂时不可用 */
                    return STATUS_WRITE;
                }
                return STATUS_ERROR;
            }
            written_ += res;
            if (written_ == bytesToSend)
                break;
        }

    }

    /* 数据发送完毕 */
    reset();
    if (keepAlive_)  /* 如果需要保持连接的话 */
        return STATUS_READ;
    else 
        return STATUS_SUCCESS;
}

数据发送可不是一件容易的事情,我的实现是这样的,前面构造的头部信息保存在writeBuf_中,文件的信息保存在fileInfo_这个结构中,所以要发送两次,第一次发送头部信息,第二次再发送文件信息.

在发送的过程中要考虑各种情况,如果系统的TCP的缓冲区已经满了,不能接收我们的数据了,我们要退出这次发送.否则的话,就一直发送,等到全部数据发送完毕为止.

你可能会感到奇怪,为什么我要分两次发送,为什么不直接将文件的信息写入到writeBuf_之中?这是因为效率的考虑,我们要尽量消除不必要的拷贝,如果要实现高性能的服务器的话.我们可以想象得到,有一些页面会特别受到用户的追捧,访问量会特别大,如果我们每一次都将这个页面从服务器加载,发送完了之后再关闭,这样会浪费我们多少cpu资源,为了更快的访问速度,我们的这个web服务器设计了一个Cache机制,将每次发送的页面缓存起来,下一次要用到这个页面的时候,直接用就可以了,不用加载.这样服务器的效率就变得高得多了.

Cache机制

为了保存文件的信息,我们设计了一个FileInfo类.

class FileInfo : noncopyable
{
public:
    FileInfo(std::string& fileName, int fileSize) {
        int srcfd = Open(fileName.c_str(), O_RDONLY, 0); /* 打开文件 */
        size_ = fileSize;
        addr_ = Mmap(0, fileSize, PROT_READ, MAP_PRIVATE, srcfd, 0);
        Close(srcfd); /* 关闭文件 */
    }
    ~FileInfo() {
        Munmap(addr_, size_); /* 解除映射 */
    }
public:
    void *addr_; /* 地址信息 */
    int size_; /* 文件大小 */
};

这个类非常简单,具体而言,就是在构建类的时候,自动映射文件,类析构的时候实现解除映射.

为了管理FileInfo,我们设计了一个Cache类.我们来看一看吧.

class Cache : noncopyable
{
    typedef std::map<std::string, boost::shared_ptr<FileInfo>>::iterator it;
private:
    std::map<std::string, boost::shared_ptr<FileInfo>> cache_; /* 实现文件名称到地址的一个映射 */
    static const size_t MAX_CACHE_SIZE = 100; /* 最多缓存100个文件 */
    ...
}

这个Cache类中其实就是记录了一个文件名到文件地址的映射关系.它有一个重要的查找函数getFileAddr:

boost::shared_ptr<FileInfo> getFileAddr(std::string fileName, int fileSize) {
        if (cache_.end() != cache_.find(fileName)) { /* 如果在cache中找到了 */
            return cache_[fileName];
        }
        if (cache_.size() >= MAX_CACHE_SIZE) { /* 文件数目过多,需要删除一个元素 */
            cache_.erase(cache_.begin()); /* 直接移除掉最前一个元素 */
        }
        /* 没有找到的话,我们需要加载文件 */
        boost::shared_ptr<FileInfo> fileInfo(new FileInfo(fileName, fileSize));
        cache_[fileName] = fileInfo;
        return fileInfo;
    }

思想很简单,那就是如果找到了,立马返回这个地址,如果缓存的文件过多,那么要删除一个元素,我这里实现的是最简单的,直接删除最前的一个元素,你也可以实现更加高效的算法.

如果没有找到的话,要重新从磁盘上加载这个文件.我们使用shared_ptr来管理资源,这种类型的指针的好处在于,只要你持有这种指针,那么指针指向的对象便不会被析构掉,恰好符合我们的要求.

我们继续来看HttpHandle类的serveStatic函数:

void HttpHandle::serveStatic(char *fileName, int fileSize)
{ /* 用于处理静态的网页 */
    int srcfd;
    char fileType[MAXLINE], buf[MAXBUF];

    /* 构造头部信息 */
    getFileType(fileName, fileType);   
    sprintf(buf, "HTTP/1.0 200 OK\r\n");
    sprintf(buf, "%sServer: Tiny Web Server\r\n", buf);
    sprintf(buf, "%sContent-length: %d\r\n", buf, fileSize);
    sprintf(buf, "%sContent-type: %s\r\n\r\n", buf, fileType);
    addResponse(buf);
    fileInfo_ = cache_.getFileAddr(fileName, fileSize); /* 添加文件 */
    sendFile_ = true;
}

这个函数在这个时候就应该变得很简单了,就是构造头部信息,获得文件信息而已.

状态机

非阻塞IO的引入使得代码变得复杂起来,我们不能用之前的阻塞IO的思想来解决现在的问题了,网页的处理其实已经变成了一个状态机了,首先是对方来了连接,我们处理这个连接的HttpHandle就要初始化,对方发来的数据,我们处理这个连接的HttpHandle立马转为读取状态,读取完成后立马转入发送状态,发送完成之后要清理资源.

下一次连接依旧如此.

缺陷

这个版本的web server并发度有所提高,但是还存在不少的缺陷,代码也不是很漂亮,接下来版本的迭代中,我们将逐步解决这个问题.

我来讲一讲缺陷在哪里:

  • 代码假设对方总是善意的,总是一次性可以接收到所有的request信息,这是不现实的,虽然大部分时候是如此,但是路走多了,总会碰到鬼,如果对方第一次发送了这样的信息:
GET /

然后停顿了30秒,才发送接下来的:

 Http/1.1\r\n...

我上面的代码势必会出错,因为对于客户端发送的消息,我只读了一次,也就是说上面的read函数只会读到GET /信息,接下来的代码中就parse这些信息,发送回复.很显然,会出错.

  • 如果对方发送了request,立马关闭了连接,我们向对方发送数据,这个时候根据UNP的描述,发送第二次的时候,会引发SIGPIPE消息,它的默认行为是关闭程序,我们是否做到了忽略这个SIGPIPE?

  • 对方老是要求keepAlive,一直连着不放,企图耗尽服务器资源怎么办?

类似的问题需要考量的东西还有很多,写一个完备的服务器可真不是一件容易的事情.

具体代码还是看这里吧!:https://github.com/lishuhuakai/Spweb

1



以上是关于一起来写web server 06 -- 单线程非阻塞IO版本的主要内容,如果未能解决你的问题,请参考以下文章

单线程 异步 同步 阻塞 非阻塞

selenium一个完整的unittest测试框架格式(单线程,非测试报告)

使用 web.py 作为非阻塞 http-server

IO流中「线程」模型总结

nodejs是单线程还是多线程

Redis入门到高可用—— 单线程