redis源码分析——2读写应答
Posted 毛毛and西西
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了redis源码分析——2读写应答相关的知识,希望对你有一定的参考价值。
一、读回调
-
客户端的消息处理循环
有上一节可知,redis通过epoll来检测是否有客户端接入,一旦有请求,则会调用
acceptTcpHandler
,然后再调用createClient
将fd和client对应起来,最后挂在到全局的server.clients
链表上面。在createClient
里面又调用了connSetReadHandler**(conn, readQueryFromClient)
来将client加入到epoll中。set_read_handler
接口的实现如上,需要注意的是这里给fd的读写绑定的是ae_handler
也就是connSocketEventHandler
。static void connSocketEventHandler(struct aeEventLoop *el, int fd, void *clientData, int mask) { UNUSED(el); UNUSED(fd); connection *conn = clientData; if (conn->state == CONN_STATE_CONNECTING && (mask & AE_WRITABLE) && conn->conn_handler) { if (connGetSocketError(conn)) { conn->last_errno = errno; conn->state = CONN_STATE_ERROR; } else { conn->state = CONN_STATE_CONNECTED; } if (!conn->write_handler) aeDeleteFileEvent(server.el,conn->fd,AE_WRITABLE); if (!callHandler(conn, conn->conn_handler)) return; conn->conn_handler = NULL; } /* Normally we execute the readable event first, and the writable * event later. This is useful as sometimes we may be able * to serve the reply of a query immediately after processing the * query. * * However if WRITE_BARRIER is set in the mask, our application is * asking us to do the reverse: never fire the writable event * after the readable. In such a case, we invert the calls. * This is useful when, for instance, we want to do things * in the beforeSleep() hook, like fsync\'ing a file to disk, * before replying to a client. */ int invert = conn->flags & CONN_FLAG_WRITE_BARRIER; int call_write = (mask & AE_WRITABLE) && conn->write_handler; int call_read = (mask & AE_READABLE) && conn->read_handler; /* Handle normal I/O flows */ if (!invert && call_read) { if (!callHandler(conn, conn->read_handler)) return; } /* Fire the writable event. */ if (call_write) { if (!callHandler(conn, conn->write_handler)) return; } /* If we have to invert the call, fire the readable event now * after the writable one. */ if (invert && call_read) { if (!callHandler(conn, conn->read_handler)) return; } }
在
connSocketEventHandler
函数既有读调用,也有写调用,因此,我们可以把connSocketEventHandler
看成是一个socket connection的处理中心。也就是说在主框架的epoll中并不会直接调用客户端的读写回调,而是会把这个epoll触发下发到connSocketEventHandler
中具体处理,这样一来相当于是框架与socket connection解耦了。上节也提到了ConnectionType
有两个实现,一个是CT_Socket
,另一个是CT_TLS
。
二、写回调
-
由上面我们知道了redis的读触发,那回复客户端的写操作是在什么时候做的呢?
在redis中,并不是取到数据后立即回复给客户端的,而是在“下一轮”循环中发送的。再看
aeMain
的实现:void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { if (eventLoop->beforesleep != NULL) eventLoop->beforesleep(eventLoop); // 每次判断触发前先执行一次beforsleep,对应的就是beforeSleep函数 aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP); } }
可知,在每次epoll前先要执行一次
beforeSleep
调用,这个函数的操作较多,我们只看和回复有关的/* This function gets called every time Redis is entering the * main loop of the event driven library, that is, before to sleep * for ready file descriptors. */ void beforeSleep(struct aeEventLoop *eventLoop) { UNUSED(eventLoop); /* Handle precise timeouts of blocked clients. */ handleBlockedClientsTimeout(); /* We should handle pending reads clients ASAP after event loop. */ handleClientsWithPendingReadsUsingThreads(); /***************** 省略一些代码 ****************/ /* Handle writes with pending output buffers. */ handleClientsWithPendingWritesUsingThreads(); /* Close clients that need to be closed asynchronous */ freeClientsInAsyncFreeQueue(); /* Before we are going to sleep, let the threads access the dataset by * releasing the GIL. Redis main thread will not touch anything at this * time. */ if (moduleCount()) moduleReleaseGIL(); }
可以看到有
handleClientsWithPendingReadsUsingThreads
和handleClientsWithPendingWritesUsingThreads
,我们只看write相关的,顺便提一下,如果开启read多线程,其线程间的路由就是在handleClientsWithPendingReadsUsingThreads
中实现的。int handleClientsWithPendingWritesUsingThreads(void) { int processed = listLength(server.clients_pending_write); if (processed == 0) return 0; /* Return ASAP if there are no clients. */ /* If we have just a few clients to serve, don\'t use I/O threads, but the * boring synchronous code. */ if (stopThreadedIOIfNeeded()) { return handleClientsWithPendingWrites(); // 如果是单线程,直接走这里了 } /* Start threads if needed. */ if (!io_threads_active) startThreadedIO(); if (tio_debug) printf("%d TOTAL WRITE pending clients\\n", processed); /* Distribute the clients across N different lists. */ listIter li; listNode *ln; listRewind(server.clients_pending_write,&li); int item_id = 0; while((ln = listNext(&li))) { client *c = listNodeValue(ln); c->flags &= ~CLIENT_PENDING_WRITE; int target_id = item_id % server.io_threads_num; listAddNodeTail(io_threads_list[target_id],c); item_id++; } /* Give the start condition to the waiting threads, by setting the * start condition atomic var. */ io_threads_op = IO_THREADS_OP_WRITE; for (int j = 1; j < server.io_threads_num; j++) { int count = listLength(io_threads_list[j]); io_threads_pending[j] = count; } /* Also use the main thread to process a slice of clients. */ listRewind(io_threads_list[0],&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); writeToClient(c,0); } listEmpty(io_threads_list[0]); /* Wait for all the other threads to end their work. */ while(1) { unsigned long pending = 0; for (int j = 1; j < server.io_threads_num; j++) pending += io_threads_pending[j]; if (pending == 0) break; } if (tio_debug) printf("I/O WRITE All threads finshed\\n"); /* Run the list of clients again to install the write handler where * needed. */ listRewind(server.clients_pending_write,&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); /* Install the write handler if there are pending writes in some * of the clients. */ if (clientHasPendingReplies(c) && connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR) // 对于没发完的回复包,设置write_handler发送 { freeClientAsync(c); } } listEmpty(server.clients_pending_write); return processed; }
handleClientsWithPendingWrites
中实现了单线程模型的回复请求过程,如果说是发送缓冲区满了,发布出去了,则设置一个写回调sendReplyToClient
,sendReplyToClient
最后又在文章开头的connSocketEventHandler
中调用。/* This function is called just before entering the event loop, in the hope * we can just write the replies to the client output buffer without any * need to use a syscall in order to install the writable event handler, * get it called, and so forth. */ int handleClientsWithPendingWrites(void) { listIter li; listNode *ln; int processed = listLength(server.clients_pending_write); listRewind(server.clients_pending_write,&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); c->flags &= ~CLIENT_PENDING_WRITE; listDelNode(server.clients_pending_write,ln); /* If a client is protected, don\'t do anything, * that may trigger write error or recreate handler. */ if (c->flags & CLIENT_PROTECTED) continue; /* Try to write buffers to the client socket. */ if (writeToClient(c,0) == C_ERR) continue; // 如果一次没发完,则继续发 /* If after the synchronous writes above we still have data to * output to the client, we need to install the writable handler. */ if (clientHasPendingReplies(c)) { int ae_barrier = 0; /* For the fsync=always policy, we want that a given FD is never * served for reading and writing in the same event loop iteration, * so that in the middle of receiving the query, and serving it * to the client, we\'ll call beforeSleep() that will do the * actual fsync of AOF to disk. the write barrier ensures that. */ if (server.aof_state == AOF_ON && server.aof_fsync == AOF_FSYNC_ALWAYS) { ae_barrier = 1; } if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_barrier) == C_ERR) { freeClientAsync(c); } } } return processed; }
三、总结
和大部分服务一样,redis在处理完数据以后并不是直接回复,而是在下一次前epoll前处理,可是redis是单线程的,这么做的目的是什么呢?是为了照顾其他客户端吗?
以上是关于redis源码分析——2读写应答的主要内容,如果未能解决你的问题,请参考以下文章
Android 插件化VirtualApp 源码分析 ( 目前的 API 现状 | 安装应用源码分析 | 安装按钮执行的操作 | 返回到 HomeActivity 执行的操作 )(代码片段