redis源码阅读三-终于把主线任务执行搞明白了
Posted 5ycode
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了redis源码阅读三-终于把主线任务执行搞明白了相关的知识,希望对你有一定的参考价值。
在redis源码阅读二-终于把redis的启动流程搞明白了 介绍redis的启动流程,也画了一张图。今天我来详细讲解下redis的主线任务是怎么执行的。
我们先看一下流程图
在redis的主线任务里,主要有三大块:
-
eventLoop->beforesleep 创建回调的写事件并绑定处理器sendReplyToClient,在handleClientsWithPendingWrites
-
aeProcessEvents 执行整主流程,主要功能
-
从epoll读取fd,将读取的数据写入server.clients
-
监听对外暴露的ip和端口(tcp socket )通过acceptTcpHandler来监听新的请求,并创建fd
-
监听到就绪事件后通过readQueryFromClient解析并执行命令
-
processTimeEvents,定时任务执行,在aeProcessEvents内部
/**
* @brief 创建文件事件,并将fd加入到对应的poll里
* 回调以后执行对应的处理器
* @param eventLoop
* @param fd 对应的请求的fd
* @param mask 事件类型
* @param proc poll回调后执行的处理器
* @param clientData 和client绑定(引用地址)
* @return int
*/
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
if (fd >= eventLoop->setsize)
errno = ERANGE;
return AE_ERR;
// 将创建的aeFileEvent 插入到对应的事件表对应的位置eventLoop->events[fd](按tcp的原理同一个fd不会出现两次)
aeFileEvent *fe = &eventLoop->events[fd];
//添加到操作系统的poll里,每个操作系统都有自己的实现,最终是把监听的对应的fd的事件放入到eventLoop.apidata
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
fe->mask |= mask;
// 将处理器给rfileProc和wfileProc(后续会拿着这个回调执行)
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
//重新对maxfd赋值
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
主流程处理代码分析-aeProcessEvents
在ae.c中
/**
* @brief 执行eventLoop
*
* @param eventLoop
*/
void aeMain(aeEventLoop *eventLoop)
eventLoop->stop = 0;
//只要没有停止,就循环执行,这个是主线程
while (!eventLoop->stop)
if (eventLoop->beforesleep != NULL)
//每次循环前执行beforesleep
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
我们按照顺序一个个的来看下对应的源代码
再看下ae.c中的aeProcessEvents
/**
* @brief 处理eventLoop里等待的 定时任务,文件事件,过期事件
* @param eventLoop
* @param flags 事件类型,
* 从main中过来是所有的事件,AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP
* 从networking过来是文件事件 AE_FILE_EVENTS|AE_DONT_WAIT
* @return int
*/
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
int processed = 0, numevents;
/* Nothing to do? return ASAP */
//针对事件类型判断
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
/* Note that we want call select() even if there are no
* file events to process as long as we want to process time
* events, in order to sleep until the next time event is ready
* to fire. */
/**
* @brief 这块的意思是:
* eventLoop 有监听的tcp 或者(flags是时间事件且等待处理)
* 去获取下一次要执行的timer任务,如果有,计算出间隔时间,
* 那在等待读写事件的时候,就最多阻塞对应的时间
* 否则就一直阻塞直到有任务到达(如果本身epoll设置有超时时间?)
*/
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT)))
int j;
aeTimeEvent *shortest = NULL;
//时间
struct timeval tv, *tvp;
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
shortest = aeSearchNearestTimer(eventLoop);
// 根据shortest计算出差值事件tvp
/**
* @brief 从epoll里拿到numevents数量的数据,会把任务放到fired里,这里并没有指定事件类型
* 有时间,就阻塞指定的时间,
* 没有时间,直到有数据,这时定时任务也不用执行(redis是极简主义,猜测作者的意图,你都没有读写请求,我还处理定期任务干啥)
*/
numevents = aeApiPoll(eventLoop, tvp);
/* After sleep callback. */
//执行aftersleep
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
eventLoop->aftersleep(eventLoop);
/**
* @brief 执行触发的事件
*/
for (j = 0; j < numevents; j++)
/**
* @brief 从注册的事件表中拿到对应fd的事件,插入逻辑在aeCreateFileEvent里
*/
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int fired = 0; /* Number of events fired for current fd. */
//反转主要是为了持久化设置的,在持久化的时候,设置的屏障,等后续看持久化的代码再详解 TODO
int invert = fe->mask & AE_BARRIER;
/**
* @brief 读事件(AE_READABLE)处理器包括:
* 由acceptTcpHandler处理的,在initServer里创建
* 由readQueryFromClient处理的,在acceptTcpHandler.acceptCommonHandler.createClient里创建
* TODO 其他的读事件后续分析
*
* 写事件(AE_WRITABLE)处理器包括:
* 由sendReplyToClient 处理的,在beforeSleep->handleClientsWithPendingWrites里创建
* TODO 其他的写事件后续分析
*/
if (!invert && fe->mask & mask & AE_READABLE)
//rfileProc和wfileProc由aeCreateFileEvent 创建fe时,传入的处理器,和fd绑定的,这里的fe->clientData 就是client,在aeCreateFileEvent里
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
/* Fire the writable event. */
//处理写事件
if (fe->mask & mask & AE_WRITABLE)
if (!fired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
// 反转调用,
if (invert && fe->mask & mask & AE_READABLE)
if (!fired || fe->wfileProc != fe->rfileProc)
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
//处理次数+1
processed++;
/* Check time events */
//处理定期任务主要是serverCron
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */
我们先看下aeEventLoop,有助于理解
/**
* @brief 事件管理器,整个进程只有一个
*/
typedef struct aeEventLoop
//最大的fd
int maxfd; /* highest file descriptor currently registered */
//最多持有这么多连接(最大链接+128),events和fired 数组的大小
int setsize; /* max number of file descriptors tracked */
//记录最大的定时事件id(放几个为几),存放定时事件会自增
long long timeEventNextId;
time_t lastTime; /* Used to detect system clock skew */
//已注册的文件事件处理器,在initServer里,一个fd绑定一个
aeFileEvent *events; /* Registered events */
//触发的的事件(在ae中会把所有从epoll里拉取到的事件丢到这里)
aeFiredEvent *fired; /* Fired events */
//定时事件链表的头节点
aeTimeEvent *timeEventHead;
//事件循环结束标识
int stop;
//epoll的数据,
void *apidata; /* This is used for polling API specific data */
//aeProcessEvents处理前执行(每循环一次执行一次)
aeBeforeSleepProc *beforesleep;
//aeApiPoll 后执行
aeBeforeSleepProc *aftersleep;
aeEventLoop;
我们看下aeApiPoll 是如何从epoll里拿数据的?
在此之前,我们得先了解下redis是如何选择的。在ae.c文件中
/**
* @brief ae的几种实现
* redis按照性能从上到下排序
* evport: 支持Solaris
* epoll: 支持linux
* kqueue: 支持FreeBSD 系统 如macos
* select: 都不包含就是select
*/
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
#ifdef HAVE_EPOLL
#include "ae_epoll.c"
#else
#ifdef HAVE_KQUEUE
#include "ae_kqueue.c"
#else
#include "ae_select.c"
#endif
#endif
#endif
由于我的电脑是mac,直接定位到kqueue。我们看下ae_kqueue.c中的aeApiPoll
/**
* @brief 单位时间内获取事件数量
* @param eventLoop
* @param tvp 单位时间
* @return int 返回待处理的事件数量
*/
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp)
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
/**
* @brief 获取已经就绪的事件数据,是和fd绑定的,fd已经绑定了对应的处理器
* timeout指针为空,那么kevent()会永久阻塞,直到事件发生
* timeout有值,那么最多阻塞这么长时间
*/
if (tvp != NULL)
struct timespec timeout;
timeout.tv_sec = tvp->tv_sec;
timeout.tv_nsec = tvp->tv_usec * 1000;
retval = kevent(state->kqfd, NULL, 0, state->events, eventLoop->setsize,
&timeout);
else
retval = kevent(state->kqfd, NULL, 0, state->events, eventLoop->setsize,
NULL);
//将事件填充到eventLoop->fired中
if (retval > 0)
int j;
numevents = retval;
for(j = 0; j < numevents; j++)
int mask = 0;
struct kevent *e = state->events+j;
if (e->filter == EVFILT_READ) mask |= AE_READABLE;
if (e->filter == EVFILT_WRITE) mask |= AE_WRITABLE;
// 这里的ident是创建事件的时候赋值的
eventLoop->fired[j].fd = e->ident;
eventLoop->fired[j].mask = mask;
return numevents;
处理器
acceptTcpHandler
源码位置:networking.c中
注册时机:启动时initServer
处理内容:
-
监听开放的ip和端口,拿到请求后创建fd,
-
根据fd创建文件事件,并给fd绑定回调函数readQueryFromClient,用于处理AE_READABLE事件
-
创建client,并绑定readQueryFromClient到该fd上,当监听到这个fd的AE_READABLE事件后,回调
我们看下acceptTcpHandler如何处理(关键点)
/**
* @brief tcp处理器
* @param el
* @param fd 当前tcp的fd
* @param privdata 对应epoll数据
* @param mask
*/
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask)
/**
* cport 当前的端口
* cfd 当前的fd
* max 一次最多处理1000
*/
int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
char cip[NET_IP_STR_LEN];
UNUSED(el);
UNUSED(mask);
UNUSED(privdata);
//取tcp请求
while(max--)
/**
* @brief 监听tcp socket ,获取一个新的fd,后续再研究下这里 TODO
* 新的fd就是一个有效的链接
*/
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR)
if (errno != EWOULDBLOCK)
serverLog(LL_WARNING,
"Accepting client connection: %s", server.neterr);
return;
serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
//针对新监听到的请求处理cfd
acceptCommonHandler(cfd,0,cip);
/**
* @brief 针对新监听到的请求(fd)处理
* 主要是创建file event 让readQueryFromClient 监听AE_READABLE 事件
* 并将client加入到server.clients的队尾
* @param fd 针对
* @param flags
* @param ip
*/
static void acceptCommonHandler(int fd, int flags, char *ip)
client *c;
// 根据监听到的请求fd创建客户端,并扔到server.clients队尾,如果有事务业务初始化
if ((c = createClient(fd)) == NULL)
serverLog(LL_WARNING,
"Error registering fd event for the new client: %s (fd=%d)",
strerror(errno),fd);
close(fd); /* May be already closed, just ignore errors */
return;
//超了maxclients,直接就不处理了
if (listLength(server.clients) > server.maxclients)
char *err = "-ERR max number of clients reached\\r\\n";
//拒绝链接数+1
server.stat_rejected_conn++;
freeClient(c);
return;
//链接数+1
server.stat_numconnections++;
c->flags |= flags;
最关键的在createClient
/**
* @brief 根据fd创建客户端信息(重要)
* 1,设置新的请求为非阻塞,无延迟,并设置KeepAlive为erver.tcpkeepalive
* 2,根据fd创建一个file event 并给fd绑定回调函数readQueryFromClient,当监听到这个fd的AE_READABLE事件后,回调
* 3,根据此fd创建一个client,并将该client放入到server.clients的队尾
* @param fd tcp对应的fd
* @return client*
*/
client *createClient(int fd)
client *c = zmalloc(sizeof(client));
if (fd != -1)
//设置tcp非阻塞
anetNonBlock(NULL,fd);
//设置tcp无延迟
anetEnableTcpNoDelay(NULL,fd);
if (server.tcpkeepalive)
//设置tcp的KeepAlive
anetKeepAlive(NULL,fd,server.tcpkeepalive);
//注册一个file event, 由回调函数readQueryFromClient 去处理该fd的AE_READABLE事件
if (aeCreateFileEvent(server.el,fd,AE_READABLE,
readQueryFromClient, c) == AE_ERR)
//创建失败,就关闭并释放
close(fd);
zfree(c);
return NULL;
selectDb(c,0);
uint64_t client_id;
//原子获取client_id
atomicGetIncr(server.next_client_id,client_id,1);
c->id = client_id;
//client上绑定的是新请求,也就是对应请求的fd
c->fd = fd;
......
//解析tcp后flags是0
c->flags = 0;
c->ctime = c->lastinteraction = server.unixtime;
......
//将新生产的client放入server.clients队尾
if (fd != -1) linkClient(c);
//初始化事务
initClientMultiState(c);
return c;
对应流程图如下:
命令执行 readQueryFromClient
源码位置:networking.c中
注册时机:在acceptTcpHandler里监听到请求后,拿到fd,绑定readQueryFromClient
处理内容:
-
从tcp缓冲区读取信息并解析到client里
-
查找命令并执行在server.c中的processCommand里
触发代码
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
/**
* @brief 读取客户端信息
* @param el
* @param fd 对应请求的fd
* @param privdata client客户端
* @param mask
*/
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask)
//处理输入流
processInputBufferAndReplicate(c);
void processInputBufferAndReplicate(client *c)
//master节点
if (!(c->flags & CLIENT_MASTER))
//处理输入缓冲区(主要看这里)
processInputBuffer(c);
else
//集群同步复制
size_t prev_offset = c->reploff;
processInputBuffer(c);
//有数据才同步
size_t applied = c->reploff - prev_offset;
if (applied)
replicationFeedSlavesFromMasterStream(server.slaves,
c->pending_querybuf, applied);
sdsrange(c->pending_querybuf,applied,-1);
/**
* @brief 处理输入内容
*
* @param c
*/
void processInputBuffer(client *c)
/**
* @brief 根据不同类型从缓冲区读取数据
*/
if (c->reqtype == PROTO_REQ_INLINE)
//根据不同类型从缓冲区读取数据,并将客户端执行内容解析到成robj并出入c->argv
if (processInlineBuffer(c) != C_OK) break;
else if (c->reqtype == PROTO_REQ_MULTIBULK)
if (processMultibulkBuffer(c) != C_OK) break;
else
serverPanic("Unknown request type");
if (processCommand(c) == C_OK)
//设置为空,下一个client可以继续
server.current_client = NULL;
int processCommand(client *c)
//从server.commands字典里查询命令执行命令的映射,c->argv[0]为命令名称
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
//找不到命令就未知命令异常,参数不对,也异常
//其他一堆逻辑判断
/**
* @brief 执行命令
* 开始了事务(CLIENT_MULTI)直接放入Multi队列
*
*/
if (c->flags & CLIENT_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
queueMultiCommand(c);
addReply(c,shared.queued);
else
//执行命令回调
call(c,CMD_CALL_FULL);
//
c->woff = server.master_repl_offset;
if (listLength(server.ready_keys))
handleClientsBlockedOnKeys();
//命令执行
void call(client *c, int flags)
//命令执行,会执行对应的redisCommand
c->cmd->proc(c);
看下面的代码,proc就是第二个参数,就是对应命令的具体方法
struct redisCommand redisCommandTable[] =
"module",moduleCommand,-2,"as",0,NULL,0,0,0,0,0,
"get",getCommand,2,"rF",0,NULL,1,1,1,0,0,
"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0,
"setnx",setnxCommand,3,"wmF",0,NULL,1,1,1,0,0,
"setex",setexCommand,4,"wm",0,NULL,1,1,1,0,0,
......
struct redisCommand
char *name;
redisCommandProc *proc;
int arity;
char *sflags; /* Flags as string representation, one char per flag. */
int flags; /* The actual flags, obtained from the 'sflags' field. */
redisGetKeysProc *getkeys_proc;
int firstkey; /* The first argument that's a key (0 = no keys) */
int lastkey; /* The last argument that's a key */
int keystep; /* The step between first and last key */
long long microseconds, calls;
;
定时任务 serverCron
源码位置:server.c中
注册时机:initServer()
/**
* @brief 时间事件执行
* @param eventLoop fd
* @param id fd
* @param clientData
* @return int
*/
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData)
/**
* @brief 处理server.clients 里的任务
* 这里尾结点反转到头结点,因为clients采用的尾插法,如果最后一个过期了,那之前的请求响应客户端都过期了
* 1,处理超期任务
* 2,回收查询缓冲区
* 3,先不管,TODO
*/
clientsCron();
/**
* 1,处理过期key,发送过期事件并删除
* 2,内存碎片整理
* 3,rehash
*/
databasesCron();
//rdb操作
rdbSaveBackground(server.rdb_filename,rsiptr);
//aof
rewriteAppendOnlyFileBackground();
//释放需要异步释放的客户端链表
freeClientsInAsyncFreeQueue();
//其他采样
回写事件
接收任务都搞定了,那我们再看一个回写事件。就拿get命令来看
server.c中
"get",getCommand,2,"rF",0,NULL,1,1,1,0,0,
t_string.c中
void getCommand(client *c)
getGenericCommand(c);
int getGenericCommand(client *c)
robj *o;
//为空直接返回
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL)
return C_OK;
//不为空,
if (o->type != OBJ_STRING)
//添加异常回复
addReply(c,shared.wrongtypeerr);
return C_ERR;
else
//添加到响应队列
addReplyBulk(c,o);
return C_OK;
最后都转到了这里
networking.c
/**
* @brief 添加回复
* @param c
* @param obj robj
*/
void addReply(client *c, robj *obj)
//判断并把要写回的客户端写入到server.clients_pending_write
if (prepareClientToWrite(c) != C_OK) return;
//如果回复的内容是sds编码
if (sdsEncodedObject(obj))
/**
* @brief 先尝试使用_addReplyToBuffer 写入缓冲区,写入失败再_addReplyStringToList
*/
if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
_addReplyStringToList(c,obj->ptr,sdslen(obj->ptr));
else if (obj->encoding == OBJ_ENCODING_INT)
//回复的内容是编码是int(真省空间)
char buf[32];
size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);
if (_addReplyToBuffer(c,buf,len) != C_OK)
_addReplyStringToList(c,buf,len);
else
serverPanic("Wrong obj->encoding in addReply()");
int prepareClientToWrite(client *c)
/**
* 如果c->bufpos 和c->reply,说明这个客户端之前已经放入了等待写队列server.clients_pending_write
*/
if (!clientHasPendingReplies(c))
// 将客户用头插法写入server.clients_pending_write
clientInstallWriteHandler(c);
/**
* @brief 将响应内容写入c->buf数组,buf长度有限制,只有16kb
* @param c
* @param s
* @param len
* @return int
*/
int _addReplyToBuffer(client *c, const char *s, size_t len)
size_t available = sizeof(c->buf)-c->bufpos;
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return C_OK;
/* If there already are entries in the reply list, we cannot
* add anything more to the static buffer. */
//在c->reply里有值,就不管
if (listLength(c->reply) > 0) return C_ERR;
/* Check that the buffer has enough space available for this string. */
if (len > available) return C_ERR;
//复制s到c->buf
memcpy(c->buf+c->bufpos,s,len);
//表示输出缓冲区的大小
c->bufpos+=len;
return C_OK;
/**
* @brief 将响应内容输入到c->reply链表里
*
* @param c
* @param s
* @param len
*/
void _addReplyStringToList(client *c, const char *s, size_t len)
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return;
listNode *ln = listLast(c->reply);
//获取队尾
clientReplyBlock *tail = ln? listNodeValue(ln): NULL;
/**
* @brief reply 已经有了数据,就后写,
* 如果clientReplyBlock剩余的空间不够,就再新建一个
* clientReplyBlock 最少16k,如果响应字符串较小,一个填充不完
*/
if (tail)
size_t avail = tail->size - tail->used;
size_t copy = avail >= len? len: avail;
memcpy(tail->buf + tail->used, s, copy);
tail->used += copy;
s += copy;
len -= copy;
//len有数据,要么往已有的追加,还有剩余,要么是因为tail为null
if (len)
size_t size = len < PROTO_REPLY_CHUNK_BYTES? PROTO_REPLY_CHUNK_BYTES: len;
tail = zmalloc(size + sizeof(clientReplyBlock));
/* take over the allocation's internal fragmentation */
tail->size = zmalloc_usable(tail) - sizeof(clientReplyBlock);
tail->used = len;
memcpy(tail->buf, s, len);
listAddNodeTail(c->reply, tail);
c->reply_bytes += tail->size;
//将客户端加入到server.clients_to_close
asyncCloseClientOnOutputBufferLimitReached(c);
这块就不画图了,代码很清晰
-
所有的响应都走networking.c中的addReply
-
如果对客户端有响应数据,直接将client写入到server.clients_pending_write
-
addReply 又根据不同的编码输入到不同响应缓冲区
从上面几个流程来看,redis5是单线程处理的
写回客户端
一直到这里,只是把客户端写入了server.clients_pending_write
写回的数据写入到了c->buf和c->reply。
但是并没有写会客户端啊,别急,还记得我们aeMain里的beforeSleep么?
先看下server.c中的beforeSleep
/**
* @brief 循环处理前执行
*
* @param eventLoop
*/
void beforeSleep(struct aeEventLoop *eventLoop)
//激活快循环
if (server.active_expire_enabled && server.masterhost == NULL)
//执行快循环
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);
......
//将aof缓冲区写入磁盘
flushAppendOnlyFile(0);
/* Handle writes with pending output buffers. */
//处理等待的回写队列(最后再看)
handleClientsWithPendingWrites();
在networking.c中
int handleClientsWithPendingWrites(void)
int processed = listLength(server.clients_pending_write);
listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li)))
if (writeToClient(c->fd,c,0) == C_ERR) continue;
int writeToClient(int fd, client *c, int handler_installed)
while(clientHasPendingReplies(c))
//c->buf 或c->reply有数据,二选一,下面是处理。
if (c->bufpos > 0)
//直接调用底层write将数据写入了fd,响应给客户端
nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
else
o = listNodeValue(listFirst(c->reply));
objlen = o->used;
if (objlen == 0)
c->reply_bytes -= o->size;
listDelNode(c->reply,listFirst(c->reply));
continue;
//直接调用底层write将数据写入了fd,响应给客户端
nwritten = write(fd, o->buf + c->sentlen, objlen - c->sentlen);
至此,整个redis,监听、接收请求、执行命令、写回数据形成了闭环
redis源码系列文章
后续会持续更新,感兴趣的小伙伴,可以一键三连哦。
以上是关于redis源码阅读三-终于把主线任务执行搞明白了的主要内容,如果未能解决你的问题,请参考以下文章