Redis源码阅读事件机制
Posted gogocome
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Redis源码阅读事件机制相关的知识,希望对你有一定的参考价值。
Redis源码阅读(一)事件机制
Redis作为一款NoSQL非关系内存数据库,具有很高的读写性能,且原生支持的数据类型丰富,被广泛的作为缓存、分布式数据库、消息队列等应用。此外Redis还有许多高可用特性,包括数据持久化,主从模式备份等等,可以满足对数据完整有一定要求的场景。
而且Redis的源码结构简单清晰,有大量材料可以参阅;通过阅读Redis源码,掌握一些常用技术在Redis中的实现,相信会对个人编程水平有很大帮助。这里记录下我阅读Redis源码的心得。从我自己比较关心的几个技术点出发,每个技术点都是来自个人使用Redis过程中产生的问题。这里也参考了黄建宏老师的《Redis设计与实现》部分内容,不得不说参考这本书再结合源码注释,看起来绝对事半功倍。
当初选用Redis的时候,很大程度上是由于Redis的并发性能很高,可以支持大量并发请求。那Redis是如何支持高并发请求的呢?这里就引入了第一个技术点,事件处理机制。在Redis中使用了单线程的Reactor模式,属于I/O多路复用的一种常见实现模式。这里简单介绍下Reactor模式。
1. Reactor模式
从网上切一个类图,简单描述一下Reactor模式的主体结构
基本概念:
Handle:I/O操作的基本文件句柄,在linux下就是fd
Synchronous Event Demultiplexer :同步事件分离器,阻塞等待Handles中的事件发生。
Reactor: 事件分派器,负责事件的注册,删除以及对所有注册到事件分派器的事件进行监控, 当事件发生时会调用Event Handler接口来处理事件。
Event Handler: 事件处理器接口,这里需要Concrete Event Handler来实现该接口
Concrete Event Handler:真实的事件处理器,通常都是绑定了一个handle,实现对可读事件 进行读取或对可写事件进行写入的操作。
关键点:
I/O多路复用具体到网络请求上指的就是以事件驱动为基础,可实现单个线程侦听多个socket描述符的可读可写或异常状态,不需要为每个socket描述符单独创建一个线程来侦听描述符可读还是可写。在Reactor模式中,对多个描述符进行侦听的部件就是Synchronous Event Demultiplexer,通常是由操作系统提供的select/epoll/kqueue等函数实现。
Reactor模式大致的流程时序:主程序先向事件分派器注册要监听的事件,之后启动事件分派器,由事件分派器调用操作系统提供的同步事件分离器(如select/epoll)侦听事件,当事件发生时事件分派器会调用事件绑定好的处理函数handle_event()来处理事件。
由于是事件分派器是单线程,这就要求每个事件的处理函数handle_event()不能是阻塞的,否则一旦有某个事件的处理函数阻塞住,程序就无法再调用其他事件的处理函数了。
2. 源码实现
在Redis中,事件分为两大类:文件事件和时间事件。文件事件就是指客户端的网络连接请求到达,客户端的发来的命令请求到达以及服务端发出命令应答这几类事件;时间事件主要是Redis内部的定时处理器。
看下Redis对事件机制的代码实现。按照正常的逻辑,Redis服务应该初始化一个事件分派器,然后将绑定了服务器IP,服务端口的连接套接字注册到事件分派器上,之后即可启动事件分派器。启动后客户端连接到Redis服务的请求就可以被事件分派器侦听。
Redis服务器初始化位于redis.c/initServer函数,贴出该函数中有关事件分派器初始化以及服务端口注册的代码:
void initServer() { int j; ...... createSharedObjects(); adjustOpenFilesLimit(); // 初始化事件分派器 server.el = aeCreateEventLoop(server.maxclients+REDIS_EVENTLOOP_FDSET_INCR); server.db = zmalloc(sizeof(redisDb)*server.dbnum); /* Open the TCP listening socket for the user commands. */ // 打开 TCP 监听端口,用于等待客户端的命令请求 if (server.port != 0 && listenToPort(server.port,server.ipfd,&server.ipfd_count) == REDIS_ERR) exit(1); /* Open the listening Unix domain socket. */ // 打开 UNIX 本地端口 if (server.unixsocket != NULL) { unlink(server.unixsocket); /* don‘t care if this fails */ server.sofd = anetUnixServer(server.neterr,server.unixsocket, server.unixsocketperm, server.tcp_backlog); if (server.sofd == ANET_ERR) { redisLog(REDIS_WARNING, "Opening socket: %s", server.neterr); exit(1); } anetNonBlock(NULL,server.sofd); } /* Abort if there are no listening sockets at all. */ if (server.ipfd_count == 0 && server.sofd < 0) { redisLog(REDIS_WARNING, "Configured to not listen anywhere, exiting."); exit(1); } ...... updateCachedTime(); /* Create the serverCron() time event, that‘s our main way to process * background operations. */ // 为 serverCron() 创建时间事件 if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) { redisPanic("Can‘t create the serverCron time event."); exit(1); } /* Create an event handler for accepting new connections in TCP and Unix * domain sockets. */ // 为 TCP 连接关联连接应答(accept)处理器 // 用于接受并应答客户端的 connect() 调用 for (j = 0; j < server.ipfd_count; j++) { if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR) { redisPanic( "Unrecoverable error creating server.ipfd file event."); } } // 为本地套接字关联应答处理器 if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE, acceptUnixHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.sofd file event."); ...... }
aeCreateFileEvent函数相当于Reactor模型中的事件注册函数register_handle(),这里对Redis配置文件中每组IP绑定的fd都创建了侦听事件,侦听事件对应的处理器为连接应答处理器,即networking.c/acceptTcpHandler函数。处理器对accept做了封装,当用户调用connect建立连接时,Redis的事件分派器即可收到通知。
用户的与Redis服务器的连接建立后,需要为每个客户端创建一个用来通信的fd,后续接收客户端的命令请求以及发送命令应答给客户端都是通过这个新创建的fd。那什么时候应该把通信的fd注册到事件分派器中呢?显然应该在服务端接收了客户端连接之后。Redis是在acceptTcpHandler函数中也就是在连接应答处理器中创建了这个客户端通信fd的读事件监听,并为这个读事件绑定了命令请求处理器。这是由于客户端在连接Redis之后,后续发送给Redis服务的消息都是命令的形式,并期望服务器对命令给出应答。所以对Redis服务端而言,通信fd的读事件被触发就是客户端发来了命令。
acceptTcpHandler源码如下:
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { int cport, cfd, max = MAX_ACCEPTS_PER_CALL; char cip[REDIS_IP_STR_LEN]; REDIS_NOTUSED(el); REDIS_NOTUSED(mask); REDIS_NOTUSED(privdata); redisClient *c; while(max--) { // accept 客户端连接 cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); if (cfd == ANET_ERR) { if (errno != EWOULDBLOCK) redisLog(REDIS_WARNING, "Accepting client connection: %s", server.neterr); return; } // snprintf() // 为客户端创建客户端状态(redisClient) c = acceptCommonHandler(cfd,0); if(c != NULL) { snprintf(c->cip, sizeof(c->cip), "%s", cip); c->cport = cport; } redisLog(REDIS_VERBOSE,"Accepted %s:%d %s:%d ", cip, cport, c->cip, c->cport); } }
代码中可以看到acceptTcpHandler函数里会调用networking.c/acceptCommonHandler创建客户端,acceptCommonHandler中的createClient执行了对通信fd可读事件的注册
redisClient *createClient(int fd) { //createClient,主 备全量同步完成后,备创建一个client来接收主到备的实时KV // 分配空间 redisClient *c = zmalloc(sizeof(redisClient)); /* passing -1 as fd it is possible to create a non connected client. * This is useful since all the Redis commands needs to be executed * in the context of a client. When commands are executed in other * contexts (for instance a Lua script) we need a non connected client. */ if (fd != -1) { // 非阻塞 anetNonBlock(NULL,fd); // 禁用 Nagle 算法 anetEnableTcpNoDelay(NULL,fd); // 设置 keep alive if (server.tcpkeepalive) anetKeepAlive(NULL,fd,server.tcpkeepalive); // 绑定读事件到事件 loop (开始接收命令请求) //accept接收到客户端连接的时候调用该函数把fd加入事件集中 if (aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c) == AE_ERR) { close(fd); zfree(c); return NULL; } } ... ... // 返回客户端 return c; }
fd可写事件即服务器对客户端发送命令应答的事件,注册是在实际执行针对该命令的处理函数之前,注册的入口较多,不在此列举了。命令应答处理器的函数是networking.c/sendReplyToClient,应答完成后就应该通知事件分派器释放掉对与这个客户端通信的fd的应答事件监控,能看到Redis也是在sendReplyToClient函数中aeDeleteFileEvent中调用的。
Redis中的文件事件处理流程已经大体列出了,还有一个比较重要的环节就是Synchronous Event Demultiplexer的实现。在Redis中是根据操作系统支持的情况选用效率最高的实现。同步事件分离器是封装在ae.h/ae.c中的,使用统一的API供Redis来调用。分离器的具体实现是选用不同操作系统下效率最高的事件分离器,各实际的事件分离器实现在ae_epoll.c/ae_select.c/ae_evport.c/ae_kqueue.c中。
看下选取不同类型事件分离器的代码(ae.c):
/* Include the best multiplexing layer supported by this system. * The following should be ordered by performances, descending. */ #ifdef HAVE_EVPORT #include "ae_evport.c" #else #ifdef HAVE_EPOLL #include "ae_epoll.c" #else #ifdef HAVE_KQUEUE #include "ae_kqueue.c" #else #include "ae_select.c" #endif #endif #endif
以上是关于Redis源码阅读事件机制的主要内容,如果未能解决你的问题,请参考以下文章
为什么C/C++程序员都要阅读Redis源码之:Redis学习事件驱动设计