redis网络通信模块源码分析(下)
Posted 高性能服务器开发
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了redis网络通信模块源码分析(下)相关的知识,希望对你有一定的参考价值。
这里注册可写事件AE_WRITABLE的回调函数是sendReplyToClient。也就是说,当下一次某个触发可写事件时,调用的就是sendReplyToClient函数了。可以猜想,sendReplyToClient发送数据的逻辑和上面的writeToClient函数一模一样,不信请看(位于文件networking.c文件中):
/* Write event handler. Just send data to the client. */
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
UNUSED(el);
UNUSED(mask);
writeToClient(fd,privdata,1);
}
至此,redis-server发送数据的逻辑也理清楚了。这里简单做个总结:
如果有数据要发送给某个client,不需要专门注册可写事件,等触发可写事件再发送。通常的做法是,在应答数据产生的地方直接发送,如果是因为对端Tcp窗口太小引起的发送不完,则将剩余的数据存储至某个缓冲区并注册监听可写事件,等下次触发可写事件后再尝试发送,一直到数据全部发送完毕后移除可写事件。
redis-server数据的发送逻辑与这个稍微有点差别,就是将数据发送的时机放到了EventLoop的某个时间点上(这里是在ProcessEvents之前),其他的与上面完全一样。
之所以不注册监听可写事件,等可写事件触发再发送数据,原因是通常情况下,网络通信的两端数据一般都是正常收发的,一般不会出现某一端由于Tcp窗口太小而使另外一端发不出去的情况。如果注册监听可写事件,那么这个事件会频繁触发,而触发时不一定有数据需要发送,这样不仅浪费系统资源,同时也浪费服务器程序宝贵的CPU时间片。
定时器逻辑
一个网络通信模块是离不开定时器的,前面我们也介绍了在事件处理函数的中如何去除最早到期的定时器对象,这里我们接着这个问题继续讨论。在aeProcessEvents函数(位于文件ae.c中)的结尾处有这样一段代码:
/* Check time events */
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
如果存在定时器事件,则调用processTimeEvents函数(位于文件ae.c中)进行处理。
/* Process time events */
static int processTimeEvents(aeEventLoop *eventLoop) {
int processed = 0;
aeTimeEvent *te, *prev;
long long maxId;
time_t now = time(NULL);
/* If the system clock is moved to the future, and then set back to the
* right value, time events may be delayed in a random way. Often this
* means that scheduled operations will not be performed soon enough.
*
* Here we try to detect system clock skews, and force all the time
* events to be processed ASAP when this happens: the idea is that
* processing events earlier is less dangerous than delaying them
* indefinitely, and practice suggests it is. */
if (now < eventLoop->lastTime) {
te = eventLoop->timeEventHead;
while(te) {
te->when_sec = 0;
te = te->next;
}
}
eventLoop->lastTime = now;
prev = NULL;
te = eventLoop->timeEventHead;
maxId = eventLoop->timeEventNextId-1;
while(te) {
long now_sec, now_ms;
long long id;
/* Remove events scheduled for deletion. */
if (te->id == AE_DELETED_EVENT_ID) {
aeTimeEvent *next = te->next;
if (prev == NULL)
eventLoop->timeEventHead = te->next;
else
prev->next = te->next;
if (te->finalizerProc)
te->finalizerProc(eventLoop, te->clientData);
zfree(te);
te = next;
continue;
}
/* Make sure we don't process time events created by time events in
* this iteration. Note that this check is currently useless: we always
* add new timers on the head, however if we change the implementation
* detail, this check may be useful again: we keep it here for future
* defense. */
if (te->id > maxId) {
te = te->next;
continue;
}
aeGetTime(&now_sec, &now_ms);
if (now_sec > te->when_sec ||
(now_sec == te->when_sec && now_ms >= te->when_ms))
{
int retval;
id = te->id;
retval = te->timeProc(eventLoop, id, te->clientData);
processed++;
if (retval != AE_NOMORE) {
aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
} else {
te->id = AE_DELETED_EVENT_ID;
}
}
prev = te;
te = te->next;
}
return processed;
}
这段代码核心逻辑就是通过eventLoop->timeEventHead中记录的定时器对象链表遍历每个定时器对象的时间,然后与当前时间比较,如果定时器已经到期,则调用定时器对象设置的回调函数timeProc进行处理。这段代码,没有什么特别需要注意的地方。但是代码中作者考虑到了一种特殊场景,就是假设有人将当前的计算机时间调到了未来某个时刻,然后再调回来。这样就会出现now(当前时间)小于eventLoop->lastTime(记录在aeEventLoop中的上一次时间)。出现这种情况怎么办呢?redis的作者,遍历该定时器对象链表,将这个链表中的所有定时器对象的时间设置成0。这样,这些定时器就会立即得到处理了。这也就是作者在代码注释中说的:
force all the time events to be processed ASAP
ASAP应该是英文As Soon As Possible(尽快)的缩写吧。
那么redis-server中到底哪些地方使用了定时器呢?我们可以在redis源码中搜索创建定时器的函数aeCreateTimeEvent,在initServer函数中有这么一行(位于server.c文件中):
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
serverPanic("Can't create event loop timers.");
exit(1);
}
上述代码前面的章节我们也提到过,原来定时器的用途是用于redis的Cron任务。这个任务具体做些什么工作,就不是本章节的内容了,有兴趣的读者可以阅读下serverCron函数源码(位于server.c中)。
aftersleep钩子
通常情形下,在一个EventLoop中除了有定时器、IO Multiplexing和IO事件处理逻辑外,可以根据需求自定义一些函数,这类函数我们称之为“钩子函数”。钩子函数可以位于Loop的任何位置,前面我们介绍的beforesleep函数就是在事件处理之前的自定义钩子函数(位于定时器时间检测逻辑之前)。
在redis-server中,在IO Multiplexing调用与IO事件处理逻辑之间也有一个自定义钩子函数叫aftersleep。
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
//无关代码省略...
numevents = aeApiPoll(eventLoop, tvp);
/* After sleep callback. */
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
eventLoop->aftersleep(eventLoop);
for (j = 0; j < numevents; j++) {
//无关代码省略...
}
}
这个函数在main函数中设置:
int main(int argc, char **argv) {
//无关代码省略...
aeSetBeforeSleepProc(server.el,beforeSleep);
aeSetAfterSleepProc(server.el,afterSleep);
return 0;
}
由于afterSleep函数的具体作用与我们的网络通信无关,这里也就不再介绍了。
redis-server端网络通信模块小结
通过前面的讲解,我们用一张图来概括一下redis-server端的网络通信模型。
如上图所示,这就是典型的利用one loop one thread 思想实现的reactor网络通信模型,也是目前最主流的网络通信架构。而且由于redis-server的网络通信中所有的客户端fd和侦听fd都集中在一个EventLoop中,所以通常也说redis的网络通信模型是单线程的。
探究redis-cli端的网络通信模型
我们接着探究一下redis源码自带的客户端redis-cli的网络通信模块。
我们使用gdb把redis-cli跑起来以后,原来打算按Ctrl + C让gdb中断下来查看一下redis-cli跑起来有几个线程,但是实验之后发现,这样并不能让gdb中断下来,反而会导致redis-cli这个进程退出。
我们换个思路:直接把redis-cli跑起来,然后使用linux pstack + 进程id来查看下redis-cli的线程数量。
[root@localhost ~]# ps -ef | grep redis-cli
root 35454 12877 0 14:51 pts/1 00:00:00 ./redis-cli
root 35468 33548 0 14:51 pts/5 00:00:00 grep --color=auto redis-cli
[root@localhost ~]# pstack 35454
#0 0x00007f011c2186f0 in __read_nocancel () from /lib64/libpthread.so.0
#1 0x000000000041bc5c in linenoiseEdit (stdin_fd=0, stdout_fd=1, buflen=4096, prompt=<optimized out>, buf=0x7ffea3c20410 "") at linenoise.c:800
#2 linenoiseRaw (buflen=4096, prompt=<optimized out>, buf=0x7ffea3c20410 "") at linenoise.c:991
#3 linenoise (prompt=<optimized out>) at linenoise.c:1059
#4 0x00000000004116ac in repl () at redis-cli.c:1398
#5 0x000000000040aa4e in main (argc=0, argv=0x7ffea3c216b0) at redis-cli.c:2950
通过上面的输出,我们发现redis-cli只有一个主线程。既然只有一个主线程,那么我们可以断定redis-cli中的发给redis-server的命令肯定都是同步的,这里同步的意思是发送命令后一直等待服务器应答或者应答超时。
在redis-cli的main函数(位于文件redis-cli.c中)有这样一段代码:
/* Start interactive mode when no command is provided */
if (argc == 0 && !config.eval) {
/* Ignore SIGPIPE in interactive mode to force a reconnect */
signal(SIGPIPE, SIG_IGN);
/* Note that in repl mode we don't abort on connection error.
* A new attempt will be performed for every command send. */
cliConnect(0);
repl();
}
其中cliConnect(0)调用代码(位于redis-cli.c文件中)如下:
static int cliConnect(int force) {
if (context == NULL || force) {
if (context != NULL) {
redisFree(context);
}
if (config.hostsocket == NULL) {
context = redisConnect(config.hostip,config.hostport);
} else {
context = redisConnectUnix(config.hostsocket);
}
if (context->err) {
fprintf(stderr,"Could not connect to Redis at ");
if (config.hostsocket == NULL)
fprintf(stderr,"%s:%d: %s\n",config.hostip,config.hostport,context->errstr);
else
fprintf(stderr,"%s: %s\n",config.hostsocket,context->errstr);
redisFree(context);
context = NULL;
return REDIS_ERR;
}
/* Set aggressive KEEP_ALIVE socket option in the Redis context socket
* in order to prevent timeouts caused by the execution of long
* commands. At the same time this improves the detection of real
* errors. */
anetKeepAlive(NULL, context->fd, REDIS_CLI_KEEPALIVE_INTERVAL);
/* Do AUTH and select the right DB. */
if (cliAuth() != REDIS_OK)
return REDIS_ERR;
if (cliSelect() != REDIS_OK)
return REDIS_ERR;
}
return REDIS_OK;
}
这个函数做的工作可以分为三步:
context = redisConnect(config.hostip,config.hostport);
cliAuth()
cliSelect()
static int _redisContextConnectTcp(redisContext *c, const char *addr, int port,
const struct timeval *timeout,
const char *source_addr) {
//省略部分无关代码...
rv = getaddrinfo(c->tcp.host,_port,&hints,&servinfo)) != 0
s = socket(p->ai_family,p->ai_socktype,p->ai_protocol)) == -1
redisSetBlocking(c,0) != REDIS_OK
connect(s,p->ai_addr,p->ai_addrlen)
redisContextWaitReady(c,timeout_msec) != REDIS_OK
return rv; // Need to return REDIS_OK if alright
}
redisContextWaitReady函数的代码(位于net.c文件中)如下:
static int redisContextWaitReady(redisContext *c, long msec) {
struct pollfd wfd[1];
wfd[0].fd = c->fd;
wfd[0].events = POLLOUT;
if (errno == EINPROGRESS) {
int res;
if ((res = poll(wfd, 1, msec)) == -1) {
__redisSetErrorFromErrno(c, REDIS_ERR_IO, "poll(2)");
redisContextCloseFd(c);
return REDIS_ERR;
} else if (res == 0) {
errno = ETIMEDOUT;
__redisSetErrorFromErrno(c,REDIS_ERR_IO,NULL);
redisContextCloseFd(c);
return REDIS_ERR;
}
if (redisCheckSocketError(c) != REDIS_OK)
return REDIS_ERR;
return REDIS_OK;
}
__redisSetErrorFromErrno(c,REDIS_ERR_IO,NULL);
redisContextCloseFd(c);
return REDIS_ERR;
}
这里贴一下此时的调用堆栈:
(gdb) bt
#0 redisContextWaitReady (c=c@entry=0x66f050, msec=msec@entry=-1) at net.c:213
#1 0x000000000041a4dd in _redisContextConnectTcp (c=c@entry=0x66f050, addr=addr@entry=0x66f011 "127.0.0.1", port=port@entry=6379, timeout=timeout@entry=0x0,
source_addr=source_addr@entry=0x0) at net.c:391
#2 0x000000000041a948 in redisContextConnectTcp (c=c@entry=0x66f050, addr=addr@entry=0x66f011 "127.0.0.1", port=port@entry=6379, timeout=timeout@entry=0x0)
at net.c:420
#3 0x0000000000414ec9 in redisConnect (ip=0x66f011 "127.0.0.1", port=6379) at hiredis.c:682
#4 0x000000000040f6b2 in cliConnect (force=<optimized out>) at redis-cli.c:606
#5 0x000000000040aa49 in main (argc=0, argv=0x7fffffffe680) at redis-cli.c:2949
连接redis-server成功以后,会接着调用上文中提到的cliAuth和cliSelect函数,这两个函数分别根据是否配置了config.auth和config.dbnum来给redis-server发送相关命令。由于我们这里没配置,所以这两个函数实际什么也不做。
583 static int cliSelect(void) {
(gdb) n
585 if (config.dbnum == 0) return REDIS_OK;
(gdb) p config.dbnum
$11 = 0
接着调用repl函数,在这个函数中是一个while循环,不断从命令行中获取用户输入:
//位于redis-cli.c文件中
static void repl(void) {
//...省略无关代码...
while((line = linenoise(context ? config.prompt : "not connected> ")) != NULL) {
if (line[0] != '\0') {
argv = cliSplitArgs(line,&argc);
if (history) linenoiseHistoryAdd(line);
if (historyfile) linenoiseHistorySave(historyfile);
if (argv == NULL) {
printf("Invalid argument(s)\n");
linenoiseFree(line);
continue;
} else if (argc > 0) {
if (strcasecmp(argv[0],"quit") == 0 ||
strcasecmp(argv[0],"exit") == 0)
{
exit(0);
} else if (argv[0][0] == ':') {
cliSetPreferences(argv,argc,1);
continue;
} else if (strcasecmp(argv[0],"restart") == 0) {
if (config.eval) {
config.eval_ldb = 1;
config.output = OUTPUT_RAW;
return; /* Return to evalMode to restart the session. */
} else {
printf("Use 'restart' only in Lua debugging mode.");
}
} else if (argc == 3 && !strcasecmp(argv[0],"connect")) {
sdsfree(config.hostip);
config.hostip = sdsnew(argv[1]);
config.hostport = atoi(argv[2]);
cliRefreshPrompt();
cliConnect(1);
} else if (argc == 1 && !strcasecmp(argv[0],"clear")) {
linenoiseClearScreen();
} else {
long long start_time = mstime(), elapsed;
int repeat, skipargs = 0;
char *endptr;
repeat = strtol(argv[0], &endptr, 10);
if (argc > 1 && *endptr == '\0' && repeat) {
skipargs = 1;
} else {
repeat = 1;
}
issueCommandRepeat(argc-skipargs, argv+skipargs, repeat);
/* If our debugging session ended, show the EVAL final
* reply. */
if (config.eval_ldb_end) {
config.eval_ldb_end = 0;
cliReadReply(0);
printf("\n(Lua debugging session ended%s)\n\n",
config.eval_ldb_sync ? "" :
" -- dataset changes rolled back");
}
elapsed = mstime()-start_time;
if (elapsed >= 500 &&
config.output == OUTPUT_STANDARD)
{
printf("(%.2fs)\n",(double)elapsed/1000);
}
}
}
/* Free the argument vector */
sdsfreesplitres(argv,argc);
}
/* linenoise() returns malloc-ed lines like readline() */
linenoiseFree(line);
}
exit(0);
}
得到用户输入的一行命令后,先保存到历史记录中(以便下一次按键盘上的上下箭头键再次输入),然后校验命令的合法性,如果是本地命令(不需要发送给服务器的命令,如quit、exit)则直接执行,如果是远端命令,则调用issueCommandRepeat函数发送给服务器端:
//位于文件redis-cli.c中
static int issueCommandRepeat(int argc, char **argv, long repeat) {
while (1) {
config.cluster_reissue_command = 0;
if (cliSendCommand(argc,argv,repeat) != REDIS_OK) {
cliConnect(1);
/* If we still cannot send the command print error.
* We'll try to reconnect the next time. */
if (cliSendCommand(argc,argv,repeat) != REDIS_OK) {
cliPrintContextError();
return REDIS_ERR;
}
}
/* Issue the command again if we got redirected in cluster mode */
if (config.cluster_mode && config.cluster_reissue_command) {
cliConnect(1);
} else {
break;
}
}
return REDIS_OK;
}
实际发送命令的函数是cliSendCommand,在cliSendCommand函数中又调用cliReadReply函数,后者又调用redisGetReply函数,在redisGetReply函数中又调用redisBufferWrite函数,在redisBufferWrite函数中最终调用系统API write将我们输入的命令发出去:
//位于hiredis.c文件中
int redisBufferWrite(redisContext *c, int *done) {
int nwritten;
/* Return early when the context has seen an error. */
if (c->err)
return REDIS_ERR;
if (sdslen(c->obuf) > 0) {
nwritten = write(c->fd,c->obuf,sdslen(c->obuf));
if (nwritten == -1) {
if ((errno == EAGAIN && !(c->flags & REDIS_BLOCK)) || (errno == EINTR)) {
/* Try again later */
} else {
__redisSetError(c,REDIS_ERR_IO,NULL);
return REDIS_ERR;
}
} else if (nwritten > 0) {
if (nwritten == (signed)sdslen(c->obuf)) {
sdsfree(c->obuf);
c->obuf = sdsempty();
} else {
sdsrange(c->obuf,nwritten,-1);
}
}
}
if (done != NULL) *done = (sdslen(c->obuf) == 0);
return REDIS_OK;
}
在redis-cli中输入set hello world这一个简单的指令后,发送数据的调用堆栈如下:
(gdb) c
Continuing.
127.0.0.1:6379> set hello world
Breakpoint 7, redisBufferWrite (c=c@entry=0x66f050, done=done@entry=0x7fffffffe310) at hiredis.c:831
831 int redisBufferWrite(redisContext *c, int *done) {
(gdb) bt
#0 redisBufferWrite (c=c@entry=0x66f050, done=done@entry=0x7fffffffe310) at hiredis.c:831
#1 0x0000000000415942 in redisGetReply (c=0x66f050, reply=reply@entry=0x7fffffffe368) at hiredis.c:882
#2 0x00000000004102a0 in cliReadReply (output_raw_strings=output_raw_strings@entry=0) at redis-cli.c:846
#3 0x0000000000410e58 in cliSendCommand (argc=argc@entry=3, argv=argv@entry=0x693ed0, repeat=0, repeat@entry=1) at redis-cli.c:1006
#4 0x0000000000411445 in issueCommandRepeat (argc=3, argv=0x693ed0, repeat=<optimized out>) at redis-cli.c:1282
#5 0x00000000004117fa in repl () at redis-cli.c:1444
#6 0x000000000040aa4e in main (argc=0, argv=0x7fffffffe680) at redis-cli.c:2950
当然,待发送的数据需要存储在一个全局静态变量context中,这是一个结构体,定义在hiredis.h文件中。
/* Context for a connection to Redis */
typedef struct redisContext {
int err; /* Error flags, 0 when there is no error */
char errstr[128]; /* String representation of error when applicable */
int fd;
int flags;
char *obuf; /* Write buffer */
redisReader *reader; /* Protocol reader */
enum redisConnectionType connection_type;
struct timeval *timeout;
struct {
char *host;
char *source_addr;
int port;
} tcp;
struct {
char *path;
} unix_sock;
} redisContext;
其中字段obuf指向的是一个sds类型的对象,这个对象用来存储当前需要发送的命令。这也同时解决了命令一次发不完需要暂时缓存下来的问题。
在redisGetReply函数中发完数据后立马调用redisBufferRead去收取服务器的应答。
int redisGetReply(redisContext *c, void **reply) {
int wdone = 0;
void *aux = NULL;
/* Try to read pending replies */
if (redisGetReplyFromReader(c,&aux) == REDIS_ERR)
return REDIS_ERR;
/* For the blocking context, flush output buffer and read reply */
if (aux == NULL && c->flags & REDIS_BLOCK) {
/* Write until done */
do {
if (redisBufferWrite(c,&wdone) == REDIS_ERR)
return REDIS_ERR;
} while (!wdone);
/* Read until there is a reply */
do {
if (redisBufferRead(c) == REDIS_ERR)
return REDIS_ERR;
if (redisGetReplyFromReader(c,&aux) == REDIS_ERR)
return REDIS_ERR;
} while (aux == NULL);
}
/* Set reply object */
if (reply != NULL) *reply = aux;
return REDIS_OK;
}
拿到应答后就可以解析并显示在终端了。
总结起来,redis-cli是一个实实在在的网络同步通信方式,只不过通信的socket仍然设置成非阻塞模式,这样有如下三个好处:
使用connect连接服务器时,connect函数不会阻塞,可以立即返回,之后调用poll检测socket是否可写来判断是否连接成功。
在发数据时,如果因为对端tcp窗口太小发不出去,write函数也会立即返回,不会阻塞,此时可以将未发送的数据暂存,下次继续发送。
在收数据时,如果当前没有数据可读,则read函数也不会阻塞,程序也可以立即返回,继续响应用户的输入。
redis的通信协议格式
redis客户端与服务器通信使用的是纯文本协议,以\r\n来作为协议或者命令或参数之间的分隔符。
我们接着通过redis-cli给redis-server发送“set hello world”命令。
127.0.0.1:6379> set hello world
此时服务器端收到的数据格式如下:
*3\r\n3\r\nset\r\n5\r\nhello\r\n$5\r\nworld\r\n
其中第一个3是redis命令的标志信息,标志以星号()开始,数字3是请求类型,不同的命令数字可能不一样,接着\r\n分割,后面就是统一的格式:
A指令字符长度\r\n指令A\r\nB指令或key字符长度\r\nB指令\r\nC内容长度\r\nC内容\r\n
不同的指令长度不一样,携带的key和value也不一样,服务器端会根据命令的不同来进一步解析。
总结
至此,我们将redis的服务端和客户端的网络通信模块分析完了,redis的通信模型是非常常见的网络通信模型,也是非常值得学习和模仿的,建议想提高自己网络编程水平的读者可以好好研读一下。同时,在redis源码中,有许多网络通信API的使用小技巧,这也是非常值得我们学习的。
同时,redis中的用到的数据结构(字符串、链表、有序集合等)都有自己的高效实现,因此redis源码也是我们学习数据结构知识非常好的材料。
最后,redis也是目前业界用的最多的内存数据库,它不仅开源,而且源码量也不大。如果您想成为一名合格的服务器端开发人员,您应该去学习它、用好它。
以上是关于redis网络通信模块源码分析(下)的主要内容,如果未能解决你的问题,请参考以下文章