redis源码阅读-之哨兵流程

Posted 5ycode

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了redis源码阅读-之哨兵流程相关的知识,希望对你有一定的参考价值。

哨兵

redis sentinel用于管理多个redis实例,是redis高可用的解决方案之一,其本身也是分布式架构。

哨兵本身是监听者身份,没有存储功能,哨兵的服务角色及交互

  • 哨兵与主服务

  • 哨兵与从服务

  • 哨兵与哨兵

哨兵的功能

  • 集群监控 检查对应的主从集群是否正常运行(心跳机制)

  • 消息通知 同步sentinel和其他redis的相关信息(特别是某个服务出现问题时)

  • 故障转移 当主从结构中主节点故障了,如果判断为客观下线,哨兵会发起故障转移,保证服务的高可用

  • 配置中心 给客户端提供最新的master地址

名称解析

  • 主观下线sdown(Subjectively Down):哨兵中的master实例,检测到自己的链接断了,就主观认为下线了

  • 客观下线odown(Objectively Down):其他监测该节点的哨兵也认为该节点断了,就是客观下线;

  • quorum: 如果sentinel集群中有quorum个哨兵认为master节点宕机了,就客观的认为master宕机了

  • majority:如果有majority个哨兵同意进行故障转移,才会选择出来一个新的master节点来转移

启动哨兵

# redis-sentinel程序启动
redis-sentinel sentinel.conf
# redis-server程序启动
redis-server sentinel.conf --sentinel

配置哨兵

# monitor一个名为mymaster的主服务器,这个服务故障至少需要2个哨兵同意
sentinel monitor mymaster 127.0.0.1 6379 2
#Sentinel 认为服务器已经断线所需的毫秒数
sentinel down-after-milliseconds mymaster 60000
# 故障转移超时时间
sentinel failover-timeout mymaster 180000
# 在故障转移期间,最多可以有多少个从服务器同时对新的主服务器进行同步
sentinel parallel-syncs mymaster 1

sentinel monitor resque 192.168.1.3 6380 4
sentinel down-after-milliseconds resque 10000
sentinel failover-timeout resque 180000
sentinel parallel-syncs resque 5

核心代码

int main(int argc, char **argv) 
    //哨兵模式
    server.sentinel_mode = checkForSentinelMode(argc,argv);
    initServerConfig();
    //哨兵模式的情况
    if (server.sentinel_mode) 
        //初始化哨兵配置
        initSentinelConfig();
        //初始化哨兵命令和哨兵配置
        initSentinel();
    
    if (argc >= 2) 
        //将配置文件的内容填充到server中,覆盖初始化变量
        loadServerConfig(configfile,options);
    
    initServer();
    if (!server.sentinel_mode) 
        //非哨兵模式
    else
        InitServerLast();
        sentinelIsRunning();
         
    aeSetBeforeSleepProc(server.el,beforeSleep);
    aeSetAfterSleepProc(server.el,afterSleep);
    aeMain(server.el);


void initServer(void) 
    /**
     * tcp socket监听
     */
    if (server.port != 0 &&
        listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
        exit(1);   
    /**
     * @brief 创建时间处理器,并将serverCron放入处理器里(重要)
     * 在这里创建了aeTimeEvent并扔给了eventLoop->timeEventHead
     */
    if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) 
        serverPanic("Can't create event loop timers.");
        exit(1);
           
    /**
     * @brief 重点 ##########
     * 监听多少个tcp就创建多少个
     */
    for (j = 0; j < server.ipfd_count; j++) 
        //将acceptTcpHandler 放入文件监听器里,
        if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
            acceptTcpHandler,NULL) == AE_ERR)
            
                serverPanic(
                    "Unrecoverable error creating server.ipfd file event.");
            
    



/**
 * 在initServer中添加的时间事件 serverCron中
 */
void sentinelTimer(void) 
    //检查TILT条件
    sentinelCheckTiltCondition();
    sentinelHandleDictOfRedisInstances(sentinel.masters);
    sentinelRunPendingScripts();
    sentinelCollectTerminatedScripts();
    sentinelKillTimedoutScripts();

    /* We continuously change the frequency of the Redis "timer interrupt"
     * in order to desynchronize every Sentinel from every other.
     * This non-determinism avoids that Sentinels started at the same time
     * exactly continue to stay synchronized asking to be voted at the
     * same time again and again (resulting in nobody likely winning the
     * election because of split brain voting). */
    /**
     * 通过随机数,动态调整哨兵的刷新频率
     * 这样可以确保所有的哨兵不在同一个时间点触发,在投票是就会有一定的时间差,尽可能避免同一时间不能投出主节点
     */
    server.hz = CONFIG_DEFAULT_HZ + rand() % CONFIG_DEFAULT_HZ;



ae.c中

void aeMain(aeEventLoop *eventLoop) 
    eventLoop->stop = 0;
     //只要没有停止,就循环执行,这个是主线程
    while (!eventLoop->stop) 
        if (eventLoop->beforesleep != NULL)
            //每次循环前执行beforesleep
            eventLoop->beforesleep(eventLoop);
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
    


int aeProcessEvents(aeEventLoop *eventLoop, int flags)
 if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);    

//时间处理器
static int processTimeEvents(aeEventLoop *eventLoop) 
    //遍历所有的时间处理器,在initServer里注册了serverCron
    e = eventLoop->timeEventHead;  
    while(te) 
         retval = te->timeProc(eventLoop, id, te->clientData);
         te = te->next;
        


/**
 * @brief 时间事件执行
 * @param eventLoop fd
 * @param id  fd
 * @param clientData 
 * @return int 
 */
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) 
    //哨兵模式执行
    if (server.sentinel_mode) sentinelTimer();    


在config.c中

//文件解析
void loadServerConfig(char *filename, char *options) 
    loadServerConfigFromString(config);

void loadServerConfigFromString(char *config) 
    //一行行的解析
    for (i = 0; i < totlines; i++) 
        else if (!strcasecmp(argv[0],"sentinel")) 
             err = sentinelHandleConfiguration(argv+1,argc-1);
        
        


sentinel.c中

/**
 * 哨兵配置解析(一行行的被循环调用)
 * @param argv
 * @param argc
 * @return
 */
char *sentinelHandleConfiguration(char **argv, int argc) 
    if (!strcasecmp(argv[0],"monitor") && argc == 5) 
        /* monitor <name> <host> <port> <quorum> */
        //获取参数
        int quorum = atoi(argv[4]);

        if (quorum <= 0) return "Quorum must be 1 or greater.";
        //根据监听的master节点创建redis实例
        if (createSentinelRedisInstance(argv[1],SRI_MASTER,argv[2],
                                        atoi(argv[3]),quorum,NULL) == NULL)    


哨兵核心的数据结构

/**
 * 主要状态数据结构
 */
struct sentinelState 
    char myid[CONFIG_RUN_ID_SIZE+1]; /* This sentinel ID. */
    //当前选举,用于故障转移  当前代
    uint64_t current_epoch;         /* Current epoch. */
    /**
     * sentinel 监听的master节点 hash表
     */
    dict *masters;      /* Dictionary of master sentinelRedisInstances.
                           Key is the instance name, value is the
                           sentinelRedisInstance structure pointer. */
    //tilt模式标识
    int tilt;           /* Are we in TILT mode? */
    //当前执行的脚本数量
    int running_scripts;    /* Number of scripts in execution right now. */
    //tilt开始时间
    mstime_t tilt_start_time;       /* When TITL started. */
    //上次tilt 时间
    mstime_t previous_time;         /* Last time we ran the time handler. */
    //执行脚本队列
    list *scripts_queue;            /* Queue of user scripts to execute. */
    //gossip协议时的ip(如果不是null,代表通过gossip协议向此节点扩散)
    char *announce_ip;  /* IP addr that is gossiped to other sentinels if
                           not NULL. */
    //gossip协议时的端口
    int announce_port;  /* Port that is gossiped to other sentinels if
                           non zero. */
    //
    unsigned long simfailure_flags; /* Failures simulation. */
    int deny_scripts_reconfig; /* Allow SENTINEL SET ... to change script
                                  paths at runtime? */
 sentinel;

typedef struct sentinelRedisInstance 
    /**
     * 当前实例的类型,看SRI开头定义的常量宏
     */
    int flags;      /* See SRI_... defines */
    //实例名称
    char *name;     /* Master name from the point of view of this sentinel. */
    //实例运行id
    char *runid;    /* Run ID of this instance, or unique ID if is a Sentinel.*/
    //配置的选举轮次(纪元)
    uint64_t config_epoch;  /* Configuration epoch. */
    //主机地址
    sentinelAddr *addr; /* Master host. */
    instanceLink *link; /* Link to the instance, may be shared for Sentinels. */
    mstime_t last_pub_time;   /* Last time we sent hello via Pub/Sub. */
    mstime_t last_hello_time; /* Only used if SRI_SENTINEL is set. Last time
                                 we received a hello from this Sentinel
                                 via Pub/Sub. */
    //收到SENTINEL is-master-down 的回复后设置的时间
    mstime_t last_master_down_reply_time; /* Time of last reply to
                                             SENTINEL is-master-down command. */
    /**
     * 主观下线时间
     */
    mstime_t s_down_since_time; /* Subjectively down since time. */
    /**
     * 客观下线时间
     */
    mstime_t o_down_since_time; /* Objectively down since time. */
    /**
     * 下线时间,如果超过了这个时间,认为主机下线
     */
    mstime_t down_after_period; /* Consider it down after that period. */
    mstime_t info_refresh;  /* Time at which we received INFO output from it. */
    dict *renamed_commands;     /* Commands renamed in this instance:
                                   Sentinel will use the alternative commands
                                   mapped on this table to send things like
                                   SLAVEOF, CONFING, INFO, ... */

    /* Role and the first time we observed it.
     * This is useful in order to delay replacing what the instance reports
     * with our own configuration. We need to always wait some time in order
     * to give a chance to the leader to report the new configuration before
     * we do silly things. */
    int role_reported;
    mstime_t role_reported_time;
    mstime_t slave_conf_change_time; /* Last time slave master addr changed. */

    /* Master specific. */
    /** 主节点独有 */
    //监听该master的其他哨兵
    dict *sentinels;    /* Other sentinels monitoring the same master. */
    //此master的slaves节点
    dict *slaves;       /* Slaves for this master instance. */
    //quorum 当quorum个数sentinel哨兵认为master主节点失联,那么这时客观上认为主节点失联了
    unsigned int quorum;/* Number of sentinels that need to agree on failure. */
    int parallel_syncs; /* How many slaves to reconfigure at same time. */
    char *auth_pass;    /* Password to use for AUTH against master & slaves. */

    /* Slave specific. */
    /**从节点特有属性*/
    //
    mstime_t master_link_down_time; /* Slave replication link down time. */
    int slave_priority; /* Slave priority according to its INFO output. */
    mstime_t slave_reconf_sent_time; /* Time at which we sent SLAVE OF <new> */
    //从节点的,主节点信息
    struct sentinelRedisInstance *master; /* Master instance if it's slave. */
    char *slave_master_host;    /* Master host as reported by INFO */
    int slave_master_port;      /* Master port as reported by INFO */
    /**
     * info命令里master的状态
     */
    int slave_master_link_status; /* Master link status as reported by INFO */
    unsigned long long slave_repl_offset; /* Slave replication offset. */
    /* Failover */
    /**故障转移相关的变量*/
    /**
     * leader 的runid
     * 如果是主实例,这个标识就是执行故障转移的哨兵runid
     * 如果是哨兵实例,这个标识就是哨兵投票选举出来的runid
     */
    char *leader;       /* If this is a master instance, this is the runid of
                           the Sentinel that should perform the failover. If
                           this is a Sentinel, this is the runid of the Sentinel
                           that this Sentinel voted as leader. */
    //leader 的当前轮次(可以理解为一轮投票的批次号)
    uint64_t leader_epoch; /* Epoch of the 'leader' field. */
    //故障转移对应的轮次(纪元)
    uint64_t failover_epoch; /* Epoch of the currently started failover. */
    //故障转移状态
    int failover_state; /* See SENTINEL_FAILOVER_STATE_* defines. */
    //故障转移状态变更时间
    mstime_t failover_state_change_time;
    //上次发起故障转移到时间
    mstime_t failover_start_time;   /* Last failover attempt start time. */
    //故障转移超时时间,默认180秒
    mstime_t failover_timeout;      /* Max time to refresh failover state. */
    mstime_t failover_delay_logged; /* For what failover_start_time value we
                                       logged the failover delay. */
    //选到的晋升的从节点
    struct sentinelRedisInstance *promoted_slave; /* Promoted slave instance. */
    /* Scripts executed to notify admin or reconfigure clients: when they
     * are set to NULL no script is executed. */
    char *notification_script;
    char *client_reconfig_script;
    sds info; /* cached INFO output */
 sentinelRedisInstance;


/**
 * 实例链接信息
 */
typedef struct instanceLink 
    //引用次数(有几个主机持有这个对象)
    int refcount;          /* Number of sentinelRedisInstance owners. */
    /**
     *  实例的链路状态,
     *  初始状态为1
     *  命令链接和消费订阅链接都成功以后为0
     *  只要有一个链接异常就为1
     */
    int disconnected;      /* Non-zero if we need to reconnect cc or pc. */
    //等待回复的命令数
    int pending_commands;  /* Number of commands sent waiting for a reply. */
    //redis命令执行上下文
    redisAsyncContext *cc; /* Hiredis context for commands. */
    //redis 订阅发布上下文
    redisAsyncContext *pc; /* Hiredis context for Pub / Sub. */
    //cc的链接时间
    mstime_t cc_conn_time; /* cc connection time. */
    //pc的链接时间
    mstime_t pc_conn_time; /* pc connection time. */
    //最后收到消息的时间
    mstime_t pc_last_activity; /* Last time we received any message. */
    //最后收到有效ping回复的时间
    mstime_t last_avail_time; /* Last time the instance replied to ping with
                                 a reply we consider valid. */
    //当前发送ping消息的时间,接收到pong后,会置为0,如果为0,重新发送ping消息,并记录时间
    mstime_t act_ping_time;   /* Time at which the last pending ping (no pong
                                 received after it) was sent. This field is
                                 set to 0 when a pong is received, and set again
                                 to the current time if the value is 0 and a new
                                 ping is sent. */
    //最后一次发送ping的时间(正常act_ping_time可以表示),主要防止在故障期间发送过多的ping
    mstime_t last_ping_time;  /* Time at which we sent the last ping. This is
                                 only used to avoid sending too many pings
                                 during failure. Idle time is computed using
                                 the act_ping_time field. */
    //最后一次收到pong的时间
    mstime_t last_pong_time;  /* Last time the instance replied to ping,
                                 whatever the reply was. That's used to check
                                 if the link is idle and must be reconnected. */
    //最后重链接时间
    mstime_t last_reconn_time;  /* Last reconnection attempt performed when
                                   the link was down. */
 instanceLink;

定时任务中的哨兵,在sentinel.c中

/**
 * 在initServer中添加的时间事件 serverCron中
 */
void sentinelTimer(void) 
    //检查TILT条件
    sentinelCheckTiltCondition();
    //核心
    sentinelHandleDictOfRedisInstances(sentinel.masters);
    sentinelRunPendingScripts();
    sentinelCollectTerminatedScripts();
    sentinelKillTimedoutScripts();

    /**
     * 通过随机数,动态调整哨兵的刷新频率
     * 这样可以确保所有的哨兵不在同一个时间点触发,在投票是就会有一定的时间差,尽可能避免同一时间不能投出主节点
     */
    server.hz = CONFIG_DEFAULT_HZ + rand() % CONFIG_DEFAULT_HZ;


哨兵里的redis实例处理sentinelHandleDictOfRedisInstances

/**
 * 处理字典(sentinel.masters)里的hash表中redis实例
 * 有master节点的实例
 * 有从节点的实例
 * 有哨兵实例
 * @param instances
 */
void sentinelHandleDictOfRedisInstances(dict *instances) 
    dictIterator *di;
    dictEntry *de;
    sentinelRedisInstance *switch_to_promoted = NULL;

    /* There are a number of things we need to perform against every master. */
    //将实例字典放入到迭代器
    di = dictGetIterator(instances);
    while((de = dictNext(di)) != NULL) 
        //获取一个实例
        sentinelRedisInstance *ri = dictGetVal(de);
        //处理实例
        sentinelHandleRedisInstance(ri);
        //主实例的情况,递归处理从实例和哨兵
        if (ri->flags & SRI_MASTER) 
            sentinelHandleDictOfRedisInstances(ri->slaves);
            sentinelHandleDictOfRedisInstances(ri->sentinels);
            //如果故障转移了,最终会是这个状态
            if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) 
                switch_to_promoted = ri;
            
        
    
    if (switch_to_promoted)
        //这个时候需要切换监控
        sentinelFailoverSwitchToPromotedSlave(switch_to_promoted);
    dictReleaseIterator(di);


void sentinelHandleRedisInstance(sentinelRedisInstance *ri) 
    /* ========== MONITORING HALF ============ */
    /* Every kind of instance */
    /**
     *  建立两个链接,一个用来执行命令,一个用来订阅接收消息
     */
    sentinelReconnectInstance(ri);
    //执行周期性命令  ping  info 和hello广播
    /**
     *  周期性执行命令,
     *  正常情况下:10秒sentinel发送一个info命令,1秒发送一个ping命令,每两秒广播 hello msg
     * 主节点挂了,1秒发送一个info命令
     */
    sentinelSendPeriodicCommands(ri);

    /* Every kind of instance */
    //检查实例是否主观下线
    sentinelCheckSubjectivelyDown(ri);

    /* Masters and slaves */
    if (ri->flags & (SRI_MASTER|SRI_SLAVE)) 
        /* Nothing so far. */
    

    /* Only masters */
    //针对master节点
    if (ri->flags & SRI_MASTER) 
        //检查是否客观下线
        sentinelCheckObjectivelyDown(ri);
        //是否需要开启故障转移
        if (sentinelStartFailoverIfNeeded(ri))
            //请求其他sentinel对master的看法(选举征求其他哨兵的意见),第一次发起必须强制问询
            sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_ASK_FORCED);
        //故障转移状态机实现
        sentinelFailoverStateMachine(ri);
        //在选举过程中进来,只需要看哨兵的结果即可,可以不用再发,因为之前发过了
        sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_NO_FLAGS);
    


  • 哨兵每秒一次向所有的主、从、sentinel 节点发送一次ping命令

  • 如果一个实例最后一次ping命令的时间超过了down-after-milliseconds,那么就标记为主观下线

  • 如果一个主实例标记了主观下线,其他哨兵会确认是否客观下线

  • 哨兵每10秒一次向所有的主从服务器发送info命令(如果有客观下线,会改为每秒一次)

数据结构以及交互如下:

  • 哨兵节点持有所有的master节点实例

  • master节点实例里又对此master监听的所有哨兵master->sentinels 以及master下的所有从节点

  • 哨兵遍历对应的实例信息

  • 这些实例和自己的节点进行链接、执行命令等

主观下线判断

/**
 * 检查是否客观下线(别人也认为都下线了)
 * @param master
 */
void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) 
    dictIterator *di;
    dictEntry *de;
    unsigned int quorum = 0, odown = 0;
    /**
     * 主观下线状态,遍历所有的哨兵,
     * 如果有quorum个哨兵认为已下线,并且quorum 大于master->quorum 则判定该节点客观下线
     */
    if (master->flags & SRI_S_DOWN) 
        /* Is down for enough sentinels? */
        quorum = 1; /* the current sentinel. */
        /* Count all the other sentinels. */
        di = dictGetIterator(master->sentinels);
        while((de = dictNext(di)) != NULL) 
            //获取对应的哨兵
            sentinelRedisInstance *ri = dictGetVal(de);
            //如果这个哨兵判断也认为该节点下线了,quorum +1
            if (ri->flags & SRI_MASTER_DOWN) quorum++;
        
        dictReleaseIterator(di);
        //达到下线的阈值,标记odown(客观下线为1)
        if (quorum >= master->quorum) odown = 1;
    
    //判定主观下线后,更改该节点的状态
    /* Set the flag accordingly to the outcome. */
    if (odown) 
        // 不是客观下线状态,修改为客观下线
        if ((master->flags & SRI_O_DOWN) == 0) 
            //发出主观下线事件
            sentinelEvent(LL_WARNING,"+odown",master,"%@ #quorum %d/%d",
                quorum, master->quorum);
            //修改master的掩码为客观下线
            master->flags |= SRI_O_DOWN;
            //设置客观下线时间
            master->o_down_since_time = mstime();
        
     else 
        /**
         * 没有判断为客观下线,但是已经标记了客观下线,会把状态修改回来
         */
        if (master->flags & SRI_O_DOWN) 
            sentinelEvent(LL_WARNING,"-odown",master,"%@");
            master->flags &= ~SRI_O_DOWN;
        
    


故障转移状态机处理

/**
 * 故障转移状态机实现,针对不同的状态,处理逻辑不同
 * @param ri
 */
void sentinelFailoverStateMachine(sentinelRedisInstance *ri) 以上是关于redis源码阅读-之哨兵流程的主要内容,如果未能解决你的问题,请参考以下文章

Redis 高可用之主从复制哨兵模式集群模式

redis高可用之主从复制,哨兵,集群

redis集群源码阅读 之 集群握手

NoSQL 之Redis主从复制哨兵和集群介绍及详细搭建步骤

Redis 大型攻略之主从复制哨兵模式群集模式

Redis 大型攻略之主从复制哨兵模式群集模式