  1. 客户端的消息处理循环

    有上一节可知,redis通过epoll来检测是否有客户端接入,一旦有请求,则会调用acceptTcpHandler,然后再调用createClient将fd和client对应起来,最后挂在到全局的server.clients链表上面。在createClient里面又调用了connSetReadHandler**(conn, readQueryFromClient)来将client加入到epoll中。

    set_read_handler接口的实现如上,需要注意的是这里给fd的读写绑定的是 ae_handler也就是connSocketEventHandler

    static void connSocketEventHandler(struct aeEventLoop *el, int fd, void *clientData, int mask)
        connection *conn = clientData;
        if (conn->state == CONN_STATE_CONNECTING &&
                (mask & AE_WRITABLE) && conn->conn_handler) {
            if (connGetSocketError(conn)) {
                conn->last_errno = errno;
                conn->state = CONN_STATE_ERROR;
            } else {
                conn->state = CONN_STATE_CONNECTED;
            if (!conn->write_handler) aeDeleteFileEvent(server.el,conn->fd,AE_WRITABLE);
            if (!callHandler(conn, conn->conn_handler)) return;
            conn->conn_handler = NULL;
        /* Normally we execute the readable event first, and the writable
         * event later. This is useful as sometimes we may be able
         * to serve the reply of a query immediately after processing the
         * query.
         * However if WRITE_BARRIER is set in the mask, our application is
         * asking us to do the reverse: never fire the writable event
         * after the readable. In such a case, we invert the calls.
         * This is useful when, for instance, we want to do things
         * in the beforeSleep() hook, like fsync\'ing a file to disk,
         * before replying to a client. */
        int invert = conn->flags & CONN_FLAG_WRITE_BARRIER;
        int call_write = (mask & AE_WRITABLE) && conn->write_handler;
        int call_read = (mask & AE_READABLE) && conn->read_handler;
        /* Handle normal I/O flows */
        if (!invert && call_read) {
            if (!callHandler(conn, conn->read_handler)) return;
        /* Fire the writable event. */
        if (call_write) {
            if (!callHandler(conn, conn->write_handler)) return;
        /* If we have to invert the call, fire the readable event now
         * after the writable one. */
        if (invert && call_read) {
            if (!callHandler(conn, conn->read_handler)) return;

    connSocketEventHandler函数既有读调用,也有写调用,因此,我们可以把connSocketEventHandler看成是一个socket connection的处理中心。也就是说在主框架的epoll中并不会直接调用客户端的读写回调,而是会把这个epoll触发下发到connSocketEventHandler中具体处理,这样一来相当于是框架与socket connection解耦了。上节也提到了ConnectionType有两个实现,一个是CT_Socket,另一个是CT_TLS


  1. 由上面我们知道了redis的读触发,那回复客户端的写操作是在什么时候做的呢?


    void aeMain(aeEventLoop *eventLoop) {
        eventLoop->stop = 0;
        while (!eventLoop->stop) {
            if (eventLoop->beforesleep != NULL)
                eventLoop->beforesleep(eventLoop);  // 每次判断触发前先执行一次beforsleep,对应的就是beforeSleep函数
            aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);


    /* This function gets called every time Redis is entering the
     * main loop of the event driven library, that is, before to sleep
     * for ready file descriptors. */
    void beforeSleep(struct aeEventLoop *eventLoop) {
        /* Handle precise timeouts of blocked clients. */
        /* We should handle pending reads clients ASAP after event loop. */
    	/***************** 省略一些代码 ****************/
        /* Handle writes with pending output buffers. */
        /* Close clients that need to be closed asynchronous */
        /* Before we are going to sleep, let the threads access the dataset by
         * releasing the GIL. Redis main thread will not touch anything at this
         * time. */
        if (moduleCount()) moduleReleaseGIL();


    int handleClientsWithPendingWritesUsingThreads(void) {
        int processed = listLength(server.clients_pending_write);
        if (processed == 0) return 0; /* Return ASAP if there are no clients. */
        /* If we have just a few clients to serve, don\'t use I/O threads, but the
         * boring synchronous code. */
        if (stopThreadedIOIfNeeded()) {
            return handleClientsWithPendingWrites();  // 如果是单线程,直接走这里了
        /* Start threads if needed. */
        if (!io_threads_active) startThreadedIO();
        if (tio_debug) printf("%d TOTAL WRITE pending clients\\n", processed);
        /* Distribute the clients across N different lists. */
        listIter li;
        listNode *ln;
        int item_id = 0;
        while((ln = listNext(&li))) {
            client *c = listNodeValue(ln);
            c->flags &= ~CLIENT_PENDING_WRITE;
            int target_id = item_id % server.io_threads_num;
        /* Give the start condition to the waiting threads, by setting the
         * start condition atomic var. */
        io_threads_op = IO_THREADS_OP_WRITE;
        for (int j = 1; j < server.io_threads_num; j++) {
            int count = listLength(io_threads_list[j]);
            io_threads_pending[j] = count;
        /* Also use the main thread to process a slice of clients. */
        while((ln = listNext(&li))) {
            client *c = listNodeValue(ln);
        /* Wait for all the other threads to end their work. */
        while(1) {
            unsigned long pending = 0;
            for (int j = 1; j < server.io_threads_num; j++)
                pending += io_threads_pending[j];
            if (pending == 0) break;
        if (tio_debug) printf("I/O WRITE All threads finshed\\n");
        /* Run the list of clients again to install the write handler where
         * needed. */
        while((ln = listNext(&li))) {
            client *c = listNodeValue(ln);
            /* Install the write handler if there are pending writes in some
             * of the clients. */
            if (clientHasPendingReplies(c) &&
                    connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)  // 对于没发完的回复包,设置write_handler发送
        return processed;


    /* This function is called just before entering the event loop, in the hope
     * we can just write the replies to the client output buffer without any
     * need to use a syscall in order to install the writable event handler,
     * get it called, and so forth. */
    int handleClientsWithPendingWrites(void) {
        listIter li;
        listNode *ln;
        int processed = listLength(server.clients_pending_write);
        while((ln = listNext(&li))) {
            client *c = listNodeValue(ln);
            c->flags &= ~CLIENT_PENDING_WRITE;
            /* If a client is protected, don\'t do anything,
             * that may trigger write error or recreate handler. */
            if (c->flags & CLIENT_PROTECTED) continue;
            /* Try to write buffers to the client socket. */
            if (writeToClient(c,0) == C_ERR) continue;  // 如果一次没发完,则继续发
            /* If after the synchronous writes above we still have data to
             * output to the client, we need to install the writable handler. */
            if (clientHasPendingReplies(c)) {
                int ae_barrier = 0;
                /* For the fsync=always policy, we want that a given FD is never
                 * served for reading and writing in the same event loop iteration,
                 * so that in the middle of receiving the query, and serving it
                 * to the client, we\'ll call beforeSleep() that will do the
                 * actual fsync of AOF to disk. the write barrier ensures that. */
                if (server.aof_state == AOF_ON &&
                    server.aof_fsync == AOF_FSYNC_ALWAYS)
                    ae_barrier = 1;
                if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_barrier) == C_ERR) {
        return processed;


​ 和大部分服务一样,redis在处理完数据以后并不是直接回复,而是在下一次前epoll前处理,可是redis是单线程的,这么做的目的是什么呢?是为了照顾其他客户端吗?




