redis源码分析——2读写应答

Posted 毛毛and西西

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了redis源码分析——2读写应答相关的知识,希望对你有一定的参考价值。

一、读回调

  1. 客户端的消息处理循环

    有上一节可知,redis通过epoll来检测是否有客户端接入,一旦有请求,则会调用acceptTcpHandler,然后再调用createClient将fd和client对应起来,最后挂在到全局的server.clients链表上面。在createClient里面又调用了connSetReadHandler**(conn, readQueryFromClient)来将client加入到epoll中。

    set_read_handler接口的实现如上,需要注意的是这里给fd的读写绑定的是 ae_handler也就是connSocketEventHandler

    static void connSocketEventHandler(struct aeEventLoop *el, int fd, void *clientData, int mask)
    {
        UNUSED(el);
        UNUSED(fd);
        connection *conn = clientData;
    
        if (conn->state == CONN_STATE_CONNECTING &&
                (mask & AE_WRITABLE) && conn->conn_handler) {
    
            if (connGetSocketError(conn)) {
                conn->last_errno = errno;
                conn->state = CONN_STATE_ERROR;
            } else {
                conn->state = CONN_STATE_CONNECTED;
            }
    
            if (!conn->write_handler) aeDeleteFileEvent(server.el,conn->fd,AE_WRITABLE);
    
            if (!callHandler(conn, conn->conn_handler)) return;
            conn->conn_handler = NULL;
        }
    
        /* Normally we execute the readable event first, and the writable
         * event later. This is useful as sometimes we may be able
         * to serve the reply of a query immediately after processing the
         * query.
         *
         * However if WRITE_BARRIER is set in the mask, our application is
         * asking us to do the reverse: never fire the writable event
         * after the readable. In such a case, we invert the calls.
         * This is useful when, for instance, we want to do things
         * in the beforeSleep() hook, like fsync\'ing a file to disk,
         * before replying to a client. */
        int invert = conn->flags & CONN_FLAG_WRITE_BARRIER;
    
        int call_write = (mask & AE_WRITABLE) && conn->write_handler;
        int call_read = (mask & AE_READABLE) && conn->read_handler;
    
        /* Handle normal I/O flows */
        if (!invert && call_read) {
            if (!callHandler(conn, conn->read_handler)) return;
        }
        /* Fire the writable event. */
        if (call_write) {
            if (!callHandler(conn, conn->write_handler)) return;
        }
        /* If we have to invert the call, fire the readable event now
         * after the writable one. */
        if (invert && call_read) {
            if (!callHandler(conn, conn->read_handler)) return;
        }
    }
    

    connSocketEventHandler函数既有读调用,也有写调用,因此,我们可以把connSocketEventHandler看成是一个socket connection的处理中心。也就是说在主框架的epoll中并不会直接调用客户端的读写回调,而是会把这个epoll触发下发到connSocketEventHandler中具体处理,这样一来相当于是框架与socket connection解耦了。上节也提到了ConnectionType有两个实现,一个是CT_Socket,另一个是CT_TLS

二、写回调

  1. 由上面我们知道了redis的读触发,那回复客户端的写操作是在什么时候做的呢?

    在redis中,并不是取到数据后立即回复给客户端的,而是在“下一轮”循环中发送的。再看aeMain的实现:

    void aeMain(aeEventLoop *eventLoop) {
        eventLoop->stop = 0;
        while (!eventLoop->stop) {
            if (eventLoop->beforesleep != NULL)
                eventLoop->beforesleep(eventLoop);  // 每次判断触发前先执行一次beforsleep,对应的就是beforeSleep函数
            aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
        }
    }
    

    可知,在每次epoll前先要执行一次beforeSleep调用,这个函数的操作较多,我们只看和回复有关的

    /* This function gets called every time Redis is entering the
     * main loop of the event driven library, that is, before to sleep
     * for ready file descriptors. */
    void beforeSleep(struct aeEventLoop *eventLoop) {
        UNUSED(eventLoop);
    
        /* Handle precise timeouts of blocked clients. */
        handleBlockedClientsTimeout();
    
        /* We should handle pending reads clients ASAP after event loop. */
        handleClientsWithPendingReadsUsingThreads();
    	
    	/***************** 省略一些代码 ****************/
        
        /* Handle writes with pending output buffers. */
        handleClientsWithPendingWritesUsingThreads();
    
        /* Close clients that need to be closed asynchronous */
        freeClientsInAsyncFreeQueue();
    
        /* Before we are going to sleep, let the threads access the dataset by
         * releasing the GIL. Redis main thread will not touch anything at this
         * time. */
        if (moduleCount()) moduleReleaseGIL();
    }
    

    可以看到有handleClientsWithPendingReadsUsingThreadshandleClientsWithPendingWritesUsingThreads,我们只看write相关的,顺便提一下,如果开启read多线程,其线程间的路由就是在handleClientsWithPendingReadsUsingThreads中实现的。

    int handleClientsWithPendingWritesUsingThreads(void) {
        int processed = listLength(server.clients_pending_write);
        if (processed == 0) return 0; /* Return ASAP if there are no clients. */
    
        /* If we have just a few clients to serve, don\'t use I/O threads, but the
         * boring synchronous code. */
        if (stopThreadedIOIfNeeded()) {
            return handleClientsWithPendingWrites();  // 如果是单线程,直接走这里了
        }
    
        /* Start threads if needed. */
        if (!io_threads_active) startThreadedIO();
    
        if (tio_debug) printf("%d TOTAL WRITE pending clients\\n", processed);
    
        /* Distribute the clients across N different lists. */
        listIter li;
        listNode *ln;
        listRewind(server.clients_pending_write,&li);
        int item_id = 0;
        while((ln = listNext(&li))) {
            client *c = listNodeValue(ln);
            c->flags &= ~CLIENT_PENDING_WRITE;
            int target_id = item_id % server.io_threads_num;
            listAddNodeTail(io_threads_list[target_id],c);
            item_id++;
        }
    
        /* Give the start condition to the waiting threads, by setting the
         * start condition atomic var. */
        io_threads_op = IO_THREADS_OP_WRITE;
        for (int j = 1; j < server.io_threads_num; j++) {
            int count = listLength(io_threads_list[j]);
            io_threads_pending[j] = count;
        }
    
        /* Also use the main thread to process a slice of clients. */
        listRewind(io_threads_list[0],&li);
        while((ln = listNext(&li))) {
            client *c = listNodeValue(ln);
            writeToClient(c,0);
        }
        listEmpty(io_threads_list[0]);
    
        /* Wait for all the other threads to end their work. */
        while(1) {
            unsigned long pending = 0;
            for (int j = 1; j < server.io_threads_num; j++)
                pending += io_threads_pending[j];
            if (pending == 0) break;
        }
        if (tio_debug) printf("I/O WRITE All threads finshed\\n");
    
        /* Run the list of clients again to install the write handler where
         * needed. */
        listRewind(server.clients_pending_write,&li);
        while((ln = listNext(&li))) {
            client *c = listNodeValue(ln);
    
            /* Install the write handler if there are pending writes in some
             * of the clients. */
            if (clientHasPendingReplies(c) &&
                    connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)  // 对于没发完的回复包,设置write_handler发送
            {
                freeClientAsync(c);
            }
        }
        listEmpty(server.clients_pending_write);
        return processed;
    }
    

    handleClientsWithPendingWrites中实现了单线程模型的回复请求过程,如果说是发送缓冲区满了,发布出去了,则设置一个写回调sendReplyToClientsendReplyToClient最后又在文章开头的connSocketEventHandler中调用。

    /* This function is called just before entering the event loop, in the hope
     * we can just write the replies to the client output buffer without any
     * need to use a syscall in order to install the writable event handler,
     * get it called, and so forth. */
    int handleClientsWithPendingWrites(void) {
        listIter li;
        listNode *ln;
        int processed = listLength(server.clients_pending_write);
    
        listRewind(server.clients_pending_write,&li);
        while((ln = listNext(&li))) {
            client *c = listNodeValue(ln);
            c->flags &= ~CLIENT_PENDING_WRITE;
            listDelNode(server.clients_pending_write,ln);
    
            /* If a client is protected, don\'t do anything,
             * that may trigger write error or recreate handler. */
            if (c->flags & CLIENT_PROTECTED) continue;
    
            /* Try to write buffers to the client socket. */
            if (writeToClient(c,0) == C_ERR) continue;  // 如果一次没发完,则继续发
    
            /* If after the synchronous writes above we still have data to
             * output to the client, we need to install the writable handler. */
            if (clientHasPendingReplies(c)) {
                int ae_barrier = 0;
                /* For the fsync=always policy, we want that a given FD is never
                 * served for reading and writing in the same event loop iteration,
                 * so that in the middle of receiving the query, and serving it
                 * to the client, we\'ll call beforeSleep() that will do the
                 * actual fsync of AOF to disk. the write barrier ensures that. */
                if (server.aof_state == AOF_ON &&
                    server.aof_fsync == AOF_FSYNC_ALWAYS)
                {
                    ae_barrier = 1;
                }
                if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_barrier) == C_ERR) {
                    freeClientAsync(c);
                }
            }
        }
        return processed;
    }
    

三、总结

​ 和大部分服务一样,redis在处理完数据以后并不是直接回复,而是在下一次前epoll前处理,可是redis是单线程的,这么做的目的是什么呢?是为了照顾其他客户端吗?

以上是关于redis源码分析——2读写应答的主要内容,如果未能解决你的问题,请参考以下文章

redis通过pipeline提升吞吐量

redis通过pipeline提升吞吐量

Android 插件化VirtualApp 源码分析 ( 目前的 API 现状 | 安装应用源码分析 | 安装按钮执行的操作 | 返回到 HomeActivity 执行的操作 )(代码片段

Redis——Sentinel 高可用读写分离

spring-data-redis读写分离

Redis源码分析2:Redis的事件处理模型