redis源码阅读-之哨兵流程
Posted 5ycode
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了redis源码阅读-之哨兵流程相关的知识,希望对你有一定的参考价值。
哨兵
redis sentinel用于管理多个redis实例,是redis高可用的解决方案之一,其本身也是分布式架构。
哨兵本身是监听者身份,没有存储功能,哨兵的服务角色及交互
-
哨兵与主服务
-
哨兵与从服务
-
哨兵与哨兵
哨兵的功能
-
集群监控 检查对应的主从集群是否正常运行(心跳机制)
-
消息通知 同步sentinel和其他redis的相关信息(特别是某个服务出现问题时)
-
故障转移 当主从结构中主节点故障了,如果判断为客观下线,哨兵会发起故障转移,保证服务的高可用
-
配置中心 给客户端提供最新的master地址
名称解析
-
主观下线sdown(Subjectively Down):哨兵中的master实例,检测到自己的链接断了,就主观认为下线了
-
客观下线odown(Objectively Down):其他监测该节点的哨兵也认为该节点断了,就是客观下线;
-
quorum: 如果sentinel集群中有quorum个哨兵认为master节点宕机了,就客观的认为master宕机了
-
majority:如果有majority个哨兵同意进行故障转移,才会选择出来一个新的master节点来转移
启动哨兵
# redis-sentinel程序启动
redis-sentinel sentinel.conf
# redis-server程序启动
redis-server sentinel.conf --sentinel
配置哨兵
# monitor一个名为mymaster的主服务器,这个服务故障至少需要2个哨兵同意
sentinel monitor mymaster 127.0.0.1 6379 2
#Sentinel 认为服务器已经断线所需的毫秒数
sentinel down-after-milliseconds mymaster 60000
# 故障转移超时时间
sentinel failover-timeout mymaster 180000
# 在故障转移期间,最多可以有多少个从服务器同时对新的主服务器进行同步
sentinel parallel-syncs mymaster 1
sentinel monitor resque 192.168.1.3 6380 4
sentinel down-after-milliseconds resque 10000
sentinel failover-timeout resque 180000
sentinel parallel-syncs resque 5
核心代码
int main(int argc, char **argv)
//哨兵模式
server.sentinel_mode = checkForSentinelMode(argc,argv);
initServerConfig();
//哨兵模式的情况
if (server.sentinel_mode)
//初始化哨兵配置
initSentinelConfig();
//初始化哨兵命令和哨兵配置
initSentinel();
if (argc >= 2)
//将配置文件的内容填充到server中,覆盖初始化变量
loadServerConfig(configfile,options);
initServer();
if (!server.sentinel_mode)
//非哨兵模式
else
InitServerLast();
sentinelIsRunning();
aeSetBeforeSleepProc(server.el,beforeSleep);
aeSetAfterSleepProc(server.el,afterSleep);
aeMain(server.el);
void initServer(void)
/**
* tcp socket监听
*/
if (server.port != 0 &&
listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
exit(1);
/**
* @brief 创建时间处理器,并将serverCron放入处理器里(重要)
* 在这里创建了aeTimeEvent并扔给了eventLoop->timeEventHead
*/
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR)
serverPanic("Can't create event loop timers.");
exit(1);
/**
* @brief 重点 ##########
* 监听多少个tcp就创建多少个
*/
for (j = 0; j < server.ipfd_count; j++)
//将acceptTcpHandler 放入文件监听器里,
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
serverPanic(
"Unrecoverable error creating server.ipfd file event.");
/**
* 在initServer中添加的时间事件 serverCron中
*/
void sentinelTimer(void)
//检查TILT条件
sentinelCheckTiltCondition();
sentinelHandleDictOfRedisInstances(sentinel.masters);
sentinelRunPendingScripts();
sentinelCollectTerminatedScripts();
sentinelKillTimedoutScripts();
/* We continuously change the frequency of the Redis "timer interrupt"
* in order to desynchronize every Sentinel from every other.
* This non-determinism avoids that Sentinels started at the same time
* exactly continue to stay synchronized asking to be voted at the
* same time again and again (resulting in nobody likely winning the
* election because of split brain voting). */
/**
* 通过随机数,动态调整哨兵的刷新频率
* 这样可以确保所有的哨兵不在同一个时间点触发,在投票是就会有一定的时间差,尽可能避免同一时间不能投出主节点
*/
server.hz = CONFIG_DEFAULT_HZ + rand() % CONFIG_DEFAULT_HZ;
ae.c中
void aeMain(aeEventLoop *eventLoop)
eventLoop->stop = 0;
//只要没有停止,就循环执行,这个是主线程
while (!eventLoop->stop)
if (eventLoop->beforesleep != NULL)
//每次循环前执行beforesleep
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
//时间处理器
static int processTimeEvents(aeEventLoop *eventLoop)
//遍历所有的时间处理器,在initServer里注册了serverCron
e = eventLoop->timeEventHead;
while(te)
retval = te->timeProc(eventLoop, id, te->clientData);
te = te->next;
/**
* @brief 时间事件执行
* @param eventLoop fd
* @param id fd
* @param clientData
* @return int
*/
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData)
//哨兵模式执行
if (server.sentinel_mode) sentinelTimer();
在config.c中
//文件解析
void loadServerConfig(char *filename, char *options)
loadServerConfigFromString(config);
void loadServerConfigFromString(char *config)
//一行行的解析
for (i = 0; i < totlines; i++)
else if (!strcasecmp(argv[0],"sentinel"))
err = sentinelHandleConfiguration(argv+1,argc-1);
sentinel.c中
/**
* 哨兵配置解析(一行行的被循环调用)
* @param argv
* @param argc
* @return
*/
char *sentinelHandleConfiguration(char **argv, int argc)
if (!strcasecmp(argv[0],"monitor") && argc == 5)
/* monitor <name> <host> <port> <quorum> */
//获取参数
int quorum = atoi(argv[4]);
if (quorum <= 0) return "Quorum must be 1 or greater.";
//根据监听的master节点创建redis实例
if (createSentinelRedisInstance(argv[1],SRI_MASTER,argv[2],
atoi(argv[3]),quorum,NULL) == NULL)
哨兵核心的数据结构
/**
* 主要状态数据结构
*/
struct sentinelState
char myid[CONFIG_RUN_ID_SIZE+1]; /* This sentinel ID. */
//当前选举,用于故障转移 当前代
uint64_t current_epoch; /* Current epoch. */
/**
* sentinel 监听的master节点 hash表
*/
dict *masters; /* Dictionary of master sentinelRedisInstances.
Key is the instance name, value is the
sentinelRedisInstance structure pointer. */
//tilt模式标识
int tilt; /* Are we in TILT mode? */
//当前执行的脚本数量
int running_scripts; /* Number of scripts in execution right now. */
//tilt开始时间
mstime_t tilt_start_time; /* When TITL started. */
//上次tilt 时间
mstime_t previous_time; /* Last time we ran the time handler. */
//执行脚本队列
list *scripts_queue; /* Queue of user scripts to execute. */
//gossip协议时的ip(如果不是null,代表通过gossip协议向此节点扩散)
char *announce_ip; /* IP addr that is gossiped to other sentinels if
not NULL. */
//gossip协议时的端口
int announce_port; /* Port that is gossiped to other sentinels if
non zero. */
//
unsigned long simfailure_flags; /* Failures simulation. */
int deny_scripts_reconfig; /* Allow SENTINEL SET ... to change script
paths at runtime? */
sentinel;
typedef struct sentinelRedisInstance
/**
* 当前实例的类型,看SRI开头定义的常量宏
*/
int flags; /* See SRI_... defines */
//实例名称
char *name; /* Master name from the point of view of this sentinel. */
//实例运行id
char *runid; /* Run ID of this instance, or unique ID if is a Sentinel.*/
//配置的选举轮次(纪元)
uint64_t config_epoch; /* Configuration epoch. */
//主机地址
sentinelAddr *addr; /* Master host. */
instanceLink *link; /* Link to the instance, may be shared for Sentinels. */
mstime_t last_pub_time; /* Last time we sent hello via Pub/Sub. */
mstime_t last_hello_time; /* Only used if SRI_SENTINEL is set. Last time
we received a hello from this Sentinel
via Pub/Sub. */
//收到SENTINEL is-master-down 的回复后设置的时间
mstime_t last_master_down_reply_time; /* Time of last reply to
SENTINEL is-master-down command. */
/**
* 主观下线时间
*/
mstime_t s_down_since_time; /* Subjectively down since time. */
/**
* 客观下线时间
*/
mstime_t o_down_since_time; /* Objectively down since time. */
/**
* 下线时间,如果超过了这个时间,认为主机下线
*/
mstime_t down_after_period; /* Consider it down after that period. */
mstime_t info_refresh; /* Time at which we received INFO output from it. */
dict *renamed_commands; /* Commands renamed in this instance:
Sentinel will use the alternative commands
mapped on this table to send things like
SLAVEOF, CONFING, INFO, ... */
/* Role and the first time we observed it.
* This is useful in order to delay replacing what the instance reports
* with our own configuration. We need to always wait some time in order
* to give a chance to the leader to report the new configuration before
* we do silly things. */
int role_reported;
mstime_t role_reported_time;
mstime_t slave_conf_change_time; /* Last time slave master addr changed. */
/* Master specific. */
/** 主节点独有 */
//监听该master的其他哨兵
dict *sentinels; /* Other sentinels monitoring the same master. */
//此master的slaves节点
dict *slaves; /* Slaves for this master instance. */
//quorum 当quorum个数sentinel哨兵认为master主节点失联,那么这时客观上认为主节点失联了
unsigned int quorum;/* Number of sentinels that need to agree on failure. */
int parallel_syncs; /* How many slaves to reconfigure at same time. */
char *auth_pass; /* Password to use for AUTH against master & slaves. */
/* Slave specific. */
/**从节点特有属性*/
//
mstime_t master_link_down_time; /* Slave replication link down time. */
int slave_priority; /* Slave priority according to its INFO output. */
mstime_t slave_reconf_sent_time; /* Time at which we sent SLAVE OF <new> */
//从节点的,主节点信息
struct sentinelRedisInstance *master; /* Master instance if it's slave. */
char *slave_master_host; /* Master host as reported by INFO */
int slave_master_port; /* Master port as reported by INFO */
/**
* info命令里master的状态
*/
int slave_master_link_status; /* Master link status as reported by INFO */
unsigned long long slave_repl_offset; /* Slave replication offset. */
/* Failover */
/**故障转移相关的变量*/
/**
* leader 的runid
* 如果是主实例,这个标识就是执行故障转移的哨兵runid
* 如果是哨兵实例,这个标识就是哨兵投票选举出来的runid
*/
char *leader; /* If this is a master instance, this is the runid of
the Sentinel that should perform the failover. If
this is a Sentinel, this is the runid of the Sentinel
that this Sentinel voted as leader. */
//leader 的当前轮次(可以理解为一轮投票的批次号)
uint64_t leader_epoch; /* Epoch of the 'leader' field. */
//故障转移对应的轮次(纪元)
uint64_t failover_epoch; /* Epoch of the currently started failover. */
//故障转移状态
int failover_state; /* See SENTINEL_FAILOVER_STATE_* defines. */
//故障转移状态变更时间
mstime_t failover_state_change_time;
//上次发起故障转移到时间
mstime_t failover_start_time; /* Last failover attempt start time. */
//故障转移超时时间,默认180秒
mstime_t failover_timeout; /* Max time to refresh failover state. */
mstime_t failover_delay_logged; /* For what failover_start_time value we
logged the failover delay. */
//选到的晋升的从节点
struct sentinelRedisInstance *promoted_slave; /* Promoted slave instance. */
/* Scripts executed to notify admin or reconfigure clients: when they
* are set to NULL no script is executed. */
char *notification_script;
char *client_reconfig_script;
sds info; /* cached INFO output */
sentinelRedisInstance;
/**
* 实例链接信息
*/
typedef struct instanceLink
//引用次数(有几个主机持有这个对象)
int refcount; /* Number of sentinelRedisInstance owners. */
/**
* 实例的链路状态,
* 初始状态为1
* 命令链接和消费订阅链接都成功以后为0
* 只要有一个链接异常就为1
*/
int disconnected; /* Non-zero if we need to reconnect cc or pc. */
//等待回复的命令数
int pending_commands; /* Number of commands sent waiting for a reply. */
//redis命令执行上下文
redisAsyncContext *cc; /* Hiredis context for commands. */
//redis 订阅发布上下文
redisAsyncContext *pc; /* Hiredis context for Pub / Sub. */
//cc的链接时间
mstime_t cc_conn_time; /* cc connection time. */
//pc的链接时间
mstime_t pc_conn_time; /* pc connection time. */
//最后收到消息的时间
mstime_t pc_last_activity; /* Last time we received any message. */
//最后收到有效ping回复的时间
mstime_t last_avail_time; /* Last time the instance replied to ping with
a reply we consider valid. */
//当前发送ping消息的时间,接收到pong后,会置为0,如果为0,重新发送ping消息,并记录时间
mstime_t act_ping_time; /* Time at which the last pending ping (no pong
received after it) was sent. This field is
set to 0 when a pong is received, and set again
to the current time if the value is 0 and a new
ping is sent. */
//最后一次发送ping的时间(正常act_ping_time可以表示),主要防止在故障期间发送过多的ping
mstime_t last_ping_time; /* Time at which we sent the last ping. This is
only used to avoid sending too many pings
during failure. Idle time is computed using
the act_ping_time field. */
//最后一次收到pong的时间
mstime_t last_pong_time; /* Last time the instance replied to ping,
whatever the reply was. That's used to check
if the link is idle and must be reconnected. */
//最后重链接时间
mstime_t last_reconn_time; /* Last reconnection attempt performed when
the link was down. */
instanceLink;
定时任务中的哨兵,在sentinel.c中
/**
* 在initServer中添加的时间事件 serverCron中
*/
void sentinelTimer(void)
//检查TILT条件
sentinelCheckTiltCondition();
//核心
sentinelHandleDictOfRedisInstances(sentinel.masters);
sentinelRunPendingScripts();
sentinelCollectTerminatedScripts();
sentinelKillTimedoutScripts();
/**
* 通过随机数,动态调整哨兵的刷新频率
* 这样可以确保所有的哨兵不在同一个时间点触发,在投票是就会有一定的时间差,尽可能避免同一时间不能投出主节点
*/
server.hz = CONFIG_DEFAULT_HZ + rand() % CONFIG_DEFAULT_HZ;
哨兵里的redis实例处理sentinelHandleDictOfRedisInstances
/**
* 处理字典(sentinel.masters)里的hash表中redis实例
* 有master节点的实例
* 有从节点的实例
* 有哨兵实例
* @param instances
*/
void sentinelHandleDictOfRedisInstances(dict *instances)
dictIterator *di;
dictEntry *de;
sentinelRedisInstance *switch_to_promoted = NULL;
/* There are a number of things we need to perform against every master. */
//将实例字典放入到迭代器
di = dictGetIterator(instances);
while((de = dictNext(di)) != NULL)
//获取一个实例
sentinelRedisInstance *ri = dictGetVal(de);
//处理实例
sentinelHandleRedisInstance(ri);
//主实例的情况,递归处理从实例和哨兵
if (ri->flags & SRI_MASTER)
sentinelHandleDictOfRedisInstances(ri->slaves);
sentinelHandleDictOfRedisInstances(ri->sentinels);
//如果故障转移了,最终会是这个状态
if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG)
switch_to_promoted = ri;
if (switch_to_promoted)
//这个时候需要切换监控
sentinelFailoverSwitchToPromotedSlave(switch_to_promoted);
dictReleaseIterator(di);
void sentinelHandleRedisInstance(sentinelRedisInstance *ri)
/* ========== MONITORING HALF ============ */
/* Every kind of instance */
/**
* 建立两个链接,一个用来执行命令,一个用来订阅接收消息
*/
sentinelReconnectInstance(ri);
//执行周期性命令 ping info 和hello广播
/**
* 周期性执行命令,
* 正常情况下:10秒sentinel发送一个info命令,1秒发送一个ping命令,每两秒广播 hello msg
* 主节点挂了,1秒发送一个info命令
*/
sentinelSendPeriodicCommands(ri);
/* Every kind of instance */
//检查实例是否主观下线
sentinelCheckSubjectivelyDown(ri);
/* Masters and slaves */
if (ri->flags & (SRI_MASTER|SRI_SLAVE))
/* Nothing so far. */
/* Only masters */
//针对master节点
if (ri->flags & SRI_MASTER)
//检查是否客观下线
sentinelCheckObjectivelyDown(ri);
//是否需要开启故障转移
if (sentinelStartFailoverIfNeeded(ri))
//请求其他sentinel对master的看法(选举征求其他哨兵的意见),第一次发起必须强制问询
sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_ASK_FORCED);
//故障转移状态机实现
sentinelFailoverStateMachine(ri);
//在选举过程中进来,只需要看哨兵的结果即可,可以不用再发,因为之前发过了
sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_NO_FLAGS);
-
哨兵每秒一次向所有的主、从、sentinel 节点发送一次ping命令
-
如果一个实例最后一次ping命令的时间超过了down-after-milliseconds,那么就标记为主观下线
-
如果一个主实例标记了主观下线,其他哨兵会确认是否客观下线
-
哨兵每10秒一次向所有的主从服务器发送info命令(如果有客观下线,会改为每秒一次)
数据结构以及交互如下:
-
哨兵节点持有所有的master节点实例
-
master节点实例里又对此master监听的所有哨兵master->sentinels 以及master下的所有从节点
-
哨兵遍历对应的实例信息
-
这些实例和自己的节点进行链接、执行命令等
主观下线判断
/**
* 检查是否客观下线(别人也认为都下线了)
* @param master
*/
void sentinelCheckObjectivelyDown(sentinelRedisInstance *master)
dictIterator *di;
dictEntry *de;
unsigned int quorum = 0, odown = 0;
/**
* 主观下线状态,遍历所有的哨兵,
* 如果有quorum个哨兵认为已下线,并且quorum 大于master->quorum 则判定该节点客观下线
*/
if (master->flags & SRI_S_DOWN)
/* Is down for enough sentinels? */
quorum = 1; /* the current sentinel. */
/* Count all the other sentinels. */
di = dictGetIterator(master->sentinels);
while((de = dictNext(di)) != NULL)
//获取对应的哨兵
sentinelRedisInstance *ri = dictGetVal(de);
//如果这个哨兵判断也认为该节点下线了,quorum +1
if (ri->flags & SRI_MASTER_DOWN) quorum++;
dictReleaseIterator(di);
//达到下线的阈值,标记odown(客观下线为1)
if (quorum >= master->quorum) odown = 1;
//判定主观下线后,更改该节点的状态
/* Set the flag accordingly to the outcome. */
if (odown)
// 不是客观下线状态,修改为客观下线
if ((master->flags & SRI_O_DOWN) == 0)
//发出主观下线事件
sentinelEvent(LL_WARNING,"+odown",master,"%@ #quorum %d/%d",
quorum, master->quorum);
//修改master的掩码为客观下线
master->flags |= SRI_O_DOWN;
//设置客观下线时间
master->o_down_since_time = mstime();
else
/**
* 没有判断为客观下线,但是已经标记了客观下线,会把状态修改回来
*/
if (master->flags & SRI_O_DOWN)
sentinelEvent(LL_WARNING,"-odown",master,"%@");
master->flags &= ~SRI_O_DOWN;
故障转移状态机处理
/**
* 故障转移状态机实现,针对不同的状态,处理逻辑不同
* @param ri
*/
void sentinelFailoverStateMachine(sentinelRedisInstance *ri) 以上是关于redis源码阅读-之哨兵流程的主要内容,如果未能解决你的问题,请参考以下文章