Redis中的单线程模型
Posted Think_Higher
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Redis中的单线程模型相关的知识,希望对你有一定的参考价值。
文章目录
Redis基于Reactor模式开发了自己的网络事件处理器,称之为 文件事件处理器(File Event Hanlder)。
- 文件事件处理器由Socket、
- IO多路复用程序
- 文件事件分派器(dispather)
- 事件处理器(handler)
文件事件处理器模型
IO多路复用程序会同时监听多个socket,当被监听的socket准备好执行accept、read、write、close等操作时,与这些操作相对应的文件事件就会产生。IO多路复用程序会把所有产生事件的socket压入一个队列中,然后有序地每次仅一个socket的方式传送给文件事件分派器,文件事件分派器接收到socket之后会根据socket产生的事件类型调用对应的事件处理器进行处理。
文件事件处理器分为几种:
- 连接应答处理器:用于处理客户端的连接请求;
- 命令请求处理器:用于执行客户端传递过来的命令,比如常见的set、lpush等;
- 命令回复处理器:用于返回客户端命令的执行结果,比如set、get等命令的结果;
事件种类:
- AE_READABLE:与两个事件处理器结合使用
Redis的单线程模型怎么理解?
Redis的单线程模型怎么理解?
理解redis的单线程模型之前,需要知道一些redis的基础数据结构以及用法,本文重点不在这块,可参考:
从应用到底层 36张图带你进入Redis世界
《我们一起进大厂》系列- Redis基础
本文分析基于 redis 2.4 源码:
Redis的单线程模型是基于事件模型,redis自己实现了一个单线程的非常简洁的ae event事件驱动机制。当然redis的时间模型也是基于epoll(或者其余kqueue、poll、select)实现的。后文对于多路复用的描述都用epoll替代。
redis的事件模型库中只关注两类事件:文件(socket)事件以及时间事件。
- 文件事件(file event):用于处理 Redis 服务器和客户端之间的网络IO。
- 时间事件(time eveat):Redis 服务器中的一些操作(比如serverCron函数)需要在给定的时间点执行,而时间事件就是处理这类定时操作的。
事件驱动库实现源码在 src/ae.c
目录下,整个事件驱动的运行模型如下图所示:
- 事件管理器是整个事件驱动的核心,管理着文件事件和时间事件;
- 事件管理器的单线程负责循环从epoll里面拉取就绪事件,然后同步执行。
- 事件管理器的单线程中也会遍历时间事件列表,同步执行过期事件。
文件事件
文件事件在这里其实也就是网络文件事件(socket)。redis基于Reactor模式开发了一套文件事件处理器。基于多路复用技术epoll来同时监听多个套接字,并给每个套接字关联对应的事件处理器函数。当套接字可读或者可写的时候,事件处理器就会执行对应的函数。
以Linux下为例,IO多路复用基于epoll实现。源码可参考 ae.c、ae.h、ae_epoll.c;整个事件循环分为几个部分:
- socket连接注册;
- IO多路复用,事件驱动;
- 基于队列的文件事件分发器;
- 文件事件处理器;
如下图:
redis通过epoll的API去监听就绪的文件描述符,这里需要强调一下的是,tcp处理连接的socket同样也是通过epoll去监听。
IO多路复用程序监听,然后将对应的就绪fd列表放到队列里面;redis的线程会顺序执行这些就绪事件。
可以看一下redis里面对于就绪事件的定义:
/* File event structure */
typedef struct aeFileEvent
int mask; /* one of AE_(READABLE|WRITABLE) */
aeFileProc *rfileProc;
aeFileProc *wfileProc;
// redis client对象
void *clientData;
aeFileEvent;
/* A fired event */
typedef struct aeFiredEvent
int fd;
int mask;
aeFiredEvent;
/* State of an event based program */
typedef struct aeEventLoop
// 当前 event loop 监听的fd列表里面的最大值,如果当前 event loop 监听的fd列表为空,则maxfd为-1
int maxfd;
long long timeEventNextId;
// 注册的监听fd列表,最大支持10240个fd, 这里的下标含义是fd
aeFileEvent events[AE_SETSIZE]; /* Registered events */
// 就绪的fd列表
aeFiredEvent fired[AE_SETSIZE]; /* Fired events */
aeTimeEvent *timeEventHead;
int stop;
// 这个是epoll这个fd对应的数据对象
void *apidata; /* This is used for polling API specific data */
aeBeforeSleepProc *beforesleep;
aeEventLoop;
aeEventLoop
会通过epoll拿到就绪的事件列表然后存放到其就绪列表里面 aeFiredEvent fired[AE_SETSIZE]
。
针对每一个客户端的连接socket,aeEventLoop
都会维护一个对应的实体aeFileEvent
,这个实体里面维护了其对应的读写事件的处理器以及相应的客户端实体。
aeEventLoop
做的事情就是根据fired
数组中就绪的fd去已注册的时间events
钟找到对应的处理器执行。
具体的细节后面介绍。
时间事件
我们知道redis支持一些key的过期策略,所以其底层必然会有对应的时间事件实体。redis中定义如下:
/* Time event structure */
typedef struct aeTimeEvent
long long id; /* time event identifier. */
long when_sec; /* seconds */
long when_ms; /* milliseconds */
aeTimeProc *timeProc;
aeEventFinalizerProc *finalizerProc;
void *clientData;
// 单链表后节点指针
struct aeTimeEvent *next;
aeTimeEvent;
包含几部分很重要信息:
- 执行的时间戳,包括秒级别和毫秒级别;
- 对应的时间事件处理器
- 对应的socket客户端相关数据
- 单链表的next节点。
一个时间事件是定时事件还是周期性事件取决于时间处理器的返回值:
- 如果返回值是 AE_NOMORE,那么这个事件是一个定时事件,该事件在达到后删除,之后不会再重复执行。
- 如果返回值是非 AE_NOMORE 的值,那么这个事件为周期性事件,当一个时间事件到达后,服务器会根据时间处理器的返回值,对时间事件的 when 属性进行更新,让这个事件在一段时间后再次达到。
Redis 将所有时间事件都放在一个单向无序链表中,每次 Redis 会遍历整个链表,查找所有已经到达的时间事件,并且调用相应的事件处理器。时间复杂度不可避免是O(n)
新建事件管理器
Redis Server端初始化时候会创建事件管理器。调用链路是:
main() -> initServer() -> server.el = aeCreateEventLoop();
这里重点在 aeCreateEventLoop();
函数逻辑:
aeEventLoop *aeCreateEventLoop(void)
aeEventLoop *eventLoop;
int i;
eventLoop = zmalloc(sizeof(*eventLoop));
if (!eventLoop) return NULL;
eventLoop->timeEventHead = NULL;
eventLoop->timeEventNextId = 0;
eventLoop->stop = 0;
eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL;
if (aeApiCreate(eventLoop) == -1)
zfree(eventLoop);
return NULL;
/* Events with mask == AE_NONE are not set. So let's initialize the
* vector with it. */
for (i = 0; i < AE_SETSIZE; i++)
eventLoop->events[i].mask = AE_NONE;
return eventLoop;
static int aeApiCreate(aeEventLoop *eventLoop)
aeApiState *state = zmalloc(sizeof(aeApiState));
if (!state) return -1;
// 内核会产生一个 epoll 实例数据结构并返回一个文件描述符
// 这个特殊的描述符就是epoll实例的句柄,后面的两个epoll接口都以它为中心(即epfd形参)。
state->epfd = epoll_create(1024); /* 1024 is just an hint for the kernel */
if (state->epfd == -1) return -1;
eventLoop->apidata = state;
return 0;
typedef struct aeApiState
int epfd;
struct epoll_event events[AE_SETSIZE];
aeApiState;
主要逻辑比较清晰明了
- 新建一个
aeEventLoop
对象,并做属性初始化; - 调用
aeApiCreate(eventLoop)
来创建IO多路复用器,这里只看epoll的实现:- 新建一个
aeApiState
对象,然后通过epoll的api:epoll_create
,在内核创建一个epoll实例,并返回epoll实例的文件描述符; - 根据epoll信息将
aeApiState
初始化epoll相关信息;
- 新建一个
- 初始化
aeEventLoop
的已注册时间列表。
aeApiState对象中epfd存储epoll的标识,events是一个epoll就绪事件数组,当有epoll事件发生时,所有发生的epoll事件和其描述符将存储在这个数组中。这个就绪事件数组由应用层开辟空间、内核负责把所有发生的事件填充到该数组。
创建文件事件
文件事件的模型结构aeFileEvent
的定义如下:
typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
/* File event structure */
typedef struct aeFileEvent
int mask; /* one of AE_(READABLE|WRITABLE) */
// 读事件处理器
aeFileProc *rfileProc;
// 写事件处理器
aeFileProc *wfileProc;
// redis client对象
void *clientData;
aeFileEvent;
对于每一个socket连接,都会有一个aeFileEvent对象维护,主要包括几部分数据:
- 事件处理器处理的事件类型,用mask表示
- 读写事件处理器
- 当前socket连接,锁一一对应的redis client数据。
Redis通过调用aeCreateFileEvent
函数来创建文件事件:
// 注册一个socket的fd到网络模型(epoll、select、poll)中
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
if (fd >= AE_SETSIZE) return AE_ERR;
aeFileEvent *fe = &eventLoop->events[fd];
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
这里入参:
- eventLoop: 事件管理器;
- fd:当前需要新建的文件事件所对应的fd;
- mask:标记,表示需要处理的事件类型
- proc:事件处理函数;
- clientData:客户端相关数据
根据aeCreateFileEvent
的入参可以知道,在注册fd到底层多路复用之前,在上层已经通过socket建立连接拿到了客户端连接的fd。
aeCreateFileEvent
主要做几件事:
- 调用
aeApiAddEvent
,监听指定 fd 的指定事件,这里分析基于epoll的实现。 - 标记fd监听的事件类型,以及绑定事件处理器;
- 更新当前eventlopp监听的最大fd。
aeApiAddEvent
的实现有多套,主要是对接不同IO多路复用实现,比如epoll、kqueue等等。这里主要看epoll实现:
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask)
aeApiState *state = eventLoop->apidata;
struct epoll_event ee;
/* If the fd was already monitored for some event, we need a MOD
* operation. Otherwise we need an ADD operation. */
int op = eventLoop->events[fd].mask == AE_NONE ?
EPOLL_CTL_ADD : EPOLL_CTL_MOD;
ee.events = 0;
mask |= eventLoop->events[fd].mask; /* Merge old events */
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.u64 = 0; /* avoid valgrind warning */
ee.data.fd = fd;
// 将被监听的描述符 fd 添加到红黑树或从红黑树中删除或者对监听事件进行修改
if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
return 0;
整体逻辑也比较清晰,主要是调用epoll的API:epoll_ctl
将fd注册到epoll内核。
整体分析下来比较清晰:创建文件事件主要工作就是将fd和对应的事件类型注册到底层IO多路复用选择器中。
Tcp Server文件事件
对于客户端已经建立连接的socket的处理,比较容易理解。那么redis对于TCP server处理客户端的连接请求是怎么做的呢?
还是从 redis 的代码逻辑触发,在 initServer()
初始化时候有初始化tcp server逻辑:
if (server.port != 0)
// 启动端口的tcp监听,并拿到对应的fd
server.ipfd = anetTcpServer(server.neterr,server.port,server.bindaddr);
if (server.ipfd == ANET_ERR)
redisLog(REDIS_WARNING, "Opening port %d: %s",
server.port, server.neterr);
exit(1);
anetTcpServer
函数负责开启tcp server并监听redis server的端口,底层的逻辑主要就是unix的系统调用:bind()
以及 listen()
函数的系统调用,这里就不展开了。
上面的逻辑主要是获取到了tcp server对应socket的fd:server.ipfd
。
initServer()
再往下:
initServer()
.......
if (server.ipfd > 0 && aeCreateFileEvent(server.el,server.ipfd,AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR) oom("creating file event");
......
这里通过调用文件事件创建函数,将server.ipfd
注册到了底层的IO多路复用处理器。 也就是说tcp server的socket在redis模型中也是一个普通的文件事件处理。
我们看一些对应socket读写事件处理函数:acceptTcpHandler
:
// fd:这里指的是server端tcp的socket的fd
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask)
int cport, cfd;
......
// 从fd里面accept客户端的连接,返回值cfd是指和客户端建立连接后获取的fd。
cfd = anetTcpAccept(server.neterr, fd, cip, &cport);
......
redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
acceptCommonHandler(cfd);
static void acceptCommonHandler(int fd)
redisClient *c;
if ((c = createClient(fd)) == NULL)
redisLog(REDIS_WARNING,"Error allocating resoures for the client");
close(fd); /* May be already closed, just ingore errors */
return;
......
if (server.maxclients && listLength(server.clients) > server.maxclients)
......
freeClient(c);
return;
......
redisClient *createClient(int fd)
redisClient *c = zmalloc(sizeof(redisClient));
......
if (aeCreateFileEvent(server.el,fd,AE_READABLE,
readQueryFromClient, c) == AE_ERR)
close(fd);
zfree(c);
return NULL;
......
return c;
- 首先会和客户端建立socket连接,拿到客户端连接的fd;
- 为客户端的连接fd创建redis client 对象
- 给客户端fd创建文件事件,注册fd到多路复用。
- 给客户端socket文件事件的事件处理函数是:
readQueryFromClient
整体来看tcp server其实在多路复用中也是一个普通的fd,事件处理器就是接受客户端连接,然后注册客户端fd到IO多路复用器中。
文件/时间事件处理
redis的单线程特性主要也是体现在 文件/时间事件 的单线程处理上。
aeMain
函数以一个无限循环不断地调用aeProcessEvents
函数来处理所有的事件:
void aeMain(aeEventLoop *eventLoop)
eventLoop->stop = 0;
while (!eventLoop->stop)
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
下面重点分析aeProcessEvents
的伪代码:
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
/* 获取到达时间距离当前时间最接近的时间事件*/
time_event = aeSearchNearestTimer();
/* 计算最接近的时间事件距离到达还有多少毫秒*/
remained_ms = time_event.when - unix_ts_now();
/* 如果事件已经到达,那么remaind_ms为负数,将其设置为0 */
if (remained_ms < 0) remained_ms = 0;
/* 根据 remaind_ms 的值,创建 timeval 结构*/
timeval = create_timeval_with_ms(remained_ms);
/* 阻塞并等待文件事件产生,最大阻塞时间由传入的 timeval 结构决定,如果remaind_ms 的值为0,则aeApiPoll 调用后立刻返回,不阻塞*/
/* aeApiPoll调用epoll_wait函数,等待I/O事件*/
aeApiPoll(timeval);
/* 处理所有已经产生的文件事件*/
processFileEvents();
/* 处理所有已经到达的时间事件*/
processTimeEvents();
重点在于aeApiPoll
,它调用epoll_wait阻塞等待epoll的事件就绪,超时时间就是之前根据最快达到时间事件计算而来的超时时间;然后将就绪的epoll事件转换到fired就绪事件。aeApiPoll就是上文所说的I/O多路复用程序。具体过程如下。
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp)
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
// 阻塞等待注册的事件发生,返回事件的数目,并将触发的事件写入events数组中。
// epoll_wait用于向用户进程返回ready list。
// 这里最后一个参数timeout单位是毫秒,epoll调用超时时间:
// 大于0,阻塞;等于0马上返回,-1表示阻塞直到有事件就绪
retval = epoll_wait(state->epfd,state->events,AE_SETSIZE,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
if (retval > 0)
int j;
// 获取epoll的ready list 并放到event loop的 fired list里面
numevents = retval;
for (j = 0; j < numevents; j++)
int mask = 0;
struct epoll_event *e = state->events+j;
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
return numevents;
processFileEvents 函数会遍历所有的就绪事件,并调用对应的事件处理器;
processTimeEvents 函数会遍历时间事件列表,执行所有到达执行时间的时间事件。
删除文件事件
当不在需要某个事件时,需要把事件删除掉。例如: 如果fd同时监听读事件、写事件。当不在需要监听写事件时,可以把该fd的写事件删除。
aeDeleteEventLoop函数的执行过程总结为以下几个步骤
- 根据fd在未就绪表中查找到事件
- 取消该fd对应的相应事件标识符
- 调用aeApiFree函数,内核会将epoll监听红黑树上的相应事件监听取消。
以上是关于Redis中的单线程模型的主要内容,如果未能解决你的问题,请参考以下文章