Codis源码分析之Sentinel

Posted 程序员升级之路

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Codis源码分析之Sentinel相关的知识,希望对你有一定的参考价值。

Sentinel是Redis官方的高可用方案,支持静态配置和运行时动态配置,Sentinel本身的机制不是本文重点,有兴趣可以直达官网:

https://redis.io/topics/sentinel


Codis也集成了Sentinel的方案,下面从如何添加Sentinel及Sentiel是如何工作的角度介绍整体流程。


一、添加Sentinel

在Fe中添加Sentinel需要分2步,先是添加:


然后是Sync


添加对应入口为Topom::AddSentinel

func (s *Topom) AddSentinel(addr string) error { // ctx, err := s.newContext() if err != nil { return err }
p := ctx.sentinel
for _, x := range p.Servers { if x == addr { return errors.Errorf("sentinel-[%s] already exists", addr) } }
sentinel := redis.NewSentinel(s.config.ProductName, s.config.ProductAuth) if err := sentinel.FlushConfig(addr, s.config.SentinelClientTimeout.Duration()); err != nil { return err  }
p.Servers = append(p.Servers, addr) p.OutOfSync = true  //更新Zk上保存数据 return s.storeUpdateSentinel(p)}

可以看到在做完一些基本的检测后,调用storeUpdateSentinel直接更新Zk中的数据,没有其它逻辑。


再看Sync的逻辑,对应入口为Topom::ResyncSentinels

func (s *Topom) ResyncSentinels() error { p := ctx.sentinel p.OutOfSync = true if err := s.storeUpdateSentinel(p); err != nil { return err }
config := &redis.MonitorConfig{ Quorum: s.config.SentinelQuorum, ParallelSyncs: s.config.SentinelParallelSyncs, DownAfter: s.config.SentinelDownAfter.Duration(), FailoverTimeout: s.config.SentinelFailoverTimeout.Duration(), NotificationScript: s.config.SentinelNotificationScript, ClientReconfigScript: s.config.SentinelClientReconfigScript, }
sentinel := redis.NewSentinel(s.config.ProductName, s.config.ProductAuth) //移除所有Master if err := sentinel.RemoveGroupsAll(p.Servers, s.config.SentinelClientTimeout.Duration()); err != nil { log.WarnErrorf(err, "remove sentinels failed") }  //监听Group if err := sentinel.MonitorGroups(p.Servers, s.config.SentinelClientTimeout.Duration(), config, ctx.getGroupMasters()); err != nil { log.WarnErrorf(err, "resync sentinels failed") return err }  //设置Group Master s.rewatchSentinels(p.Servers)
var fut sync2.Future for _, p := range ctx.proxy { fut.Add() go func(p *models.Proxy) {     //通知Proxy更新 err := s.newProxyClient(p).SetSentinels(ctx.sentinel) if err != nil { log.ErrorErrorf(err, "proxy-[%s] resync sentinel failed", p.Token) } fut.Done(p.Token, err) }(p) } for t, v := range fut.Wait() { switch err := v.(type) { case error: if err != nil { return errors.Errorf("proxy-[%s] sentinel failed", t) } } }
p.OutOfSync = false return s.storeUpdateSentinel(p)}

重点看注释的几段代码:


1、移除所有Master

对应代码为:

sentinel.RemoveGroupsAll

再跟进去:

func (s *Sentinel) RemoveGroupsAll(sentinels []string, timeout time.Duration) error { // for i := range sentinels { go func(sentinel string) { err := s.removeGroupsAllDispatch(cntx, sentinel, timeout) if err != nil { s.errorf(err, "sentinel-[%s] remove failed", sentinel) } results <- err }(sentinels[i]) }
//}

可以看到,重点是调用removeGroupsAllDispatch:

func (s *Sentinel) removeGroupsAllDispatch(ctx context.Context, sentinel string, timeout time.Duration) error { var err = s.dispatch(ctx, sentinel, timeout, func(c *Client) error { masters, err := s.mastersCommand(c) if err != nil { return err } var names []string for gid := range masters { names = append(names, s.NodeName(gid)) } return s.removeCommand(c, names) })  // return nil}

先调用mastersCommand得到Master,然后调用removeCommand,mastersCommand调用的是Sentinel masters命令:

values, err := redigo.Values(client.Do("SENTINEL", "masters"))

removeCommand调用SENTINEL remove移除对集群的监控

client.Send("SENTINEL", "remove", name)


2、监听Group

sentinel.MonitorGroups(p.Servers, s.config.SentinelClientTimeout.Duration(), config, ctx.getGroupMasters());

跟进去

func (s *Sentinel) MonitorGroups(sentinels []string, timeout time.Duration, config *MonitorConfig, groups map[int]string) error { cntx, cancel := context.WithTimeout(s.Context, timeout) defer cancel()
resolve := make(map[int]*net.TCPAddr)
var exit = make(chan error, 1)
  go func() (err error) { for gid, addr := range groups { if err := cntx.Err(); err != nil { return errors.Trace(err) }      tcpAddr, err := net.ResolveTCPAddr("tcp", addr) resolve[gid] = tcpAddr } return nil }()

timeout += time.Second * 5 results := make(chan error, len(sentinels))
for i := range sentinels { go func(sentinel string) { err := s.monitorGroupsDispatch(cntx, sentinel, timeout, config, resolve) if err != nil { s.errorf(err, "sentinel-[%s] monitor failed", sentinel) } results <- err }(sentinels[i])  }}
  //开启监控 go func() { for gid, tcpAddr := range groups { var ip, port = tcpAddr.IP.String(), tcpAddr.Port client.Send("SENTINEL", "monitor", s.NodeName(gid), ip, port, config.Quorum) } if len(groups) != 0 { client.Flush() } }()   //设置参数 go func() { for gid := range groups { var args = []interface{}{"set", s.NodeName(gid)} if config.ParallelSyncs != 0 { args = append(args, "parallel-syncs", config.ParallelSyncs) } if config.DownAfter != 0 { args = append(args, "down-after-milliseconds", int(config.DownAfter/time.Millisecond)) } if config.FailoverTimeout != 0 { args = append(args, "failover-timeout", int(config.FailoverTimeout/time.Millisecond)) } if s.Auth != "" { args = append(args, "auth-pass", s.Auth) } if config.NotificationScript != "" { args = append(args, "notification-script", config.NotificationScript) } if config.ClientReconfigScript != "" { args = append(args, "client-reconfig-script", config.ClientReconfigScript) } client.Send("SENTINEL", args...) } if len(groups) != 0 { client.Flush() } }()

主要做了2件事情况,一是调用Sentinel monitor开启对Group监控,二是设置各项参数,这些都在Codis配置文件中有对应配置项,这里不详述。


3、设置Group Master

 s.rewatchSentinels(p.Servers)

进入函数看具体逻辑:

func (s *Topom) rewatchSentinels(servers []string) {  s.ha.monitor = redis.NewSentinel(s.config.ProductName, s.config.ProductAuth)  go func(p *redis.Sentinel) { var trigger = make(chan struct{}, 1) delayUntil := func(deadline time.Time) { for !p.IsCanceled() { var d = deadline.Sub(time.Now()) if d <= 0 { return } time.Sleep(math2.MinDuration(d, time.Second)) } }  go func() { defer close(trigger) callback := func() { select { case trigger <- struct{}{}: default: } }  for !p.IsCanceled() { timeout := time.Minute * 15 retryAt := time.Now().Add(time.Second * 10)          //订阅切主信息 if !p.Subscribe(servers, timeout, callback) { delayUntil(retryAt) } else { callback() } } }()  go func() { for range trigger { var success int for i := 0; i != 10 && !p.IsCanceled() && success != 2; i++ { timeout := time.Second * 5            //得到最新的Master masters, err := p.Masters(servers, timeout) if err != nil { log.WarnErrorf(err, "fetch group masters failed") } else { if !p.IsCanceled() {               //切主 s.SwitchMasters(masters) } success += 1 } delayUntil(time.Now().Add(time.Second * 5)) } } }() }(s.ha.monitor)  

先调用Subscribe订阅选主消息:

p.Subscribe(servers, timeout, callback)

跟进去可以看实现:

 var channels = []interface{}{"+switch-master"} go func() { client.Send("SUBSCRIBE", channels...) client.Flush() }()



在订阅之后调用p.Masters,即SENTINEL masters得到最新的Master:

最后更新Master,这里的逻辑主要是更新Zk里的信息和内存中的对象。


4、通知Proxy更新

err := s.newProxyClient(p).SetSentinels(ctx.sentinel)
s.ha.servers = serverss.rewatchSentinels(s.ha.servers)

这里主要更新ha里的信息,然后调用rewatchSentinels来更新主Redis信息,逻辑和上面rewatchSentinels的逻辑一样,没看明白这里为什么还要来1次,可能只是为了让Proxy有机会更新内存中对象最新的状态。


总结下,Sync的逻辑如下:


1、移除现有Master

先调用SENTINEL  masters得到集群信息,再针对集群调用SENTINEL remove移除监控。


2、重新监听Group

通过调用Sentinel monitor完成。


3、重新设置Group Master

订阅切主通道消息,然后SENTINEL  masters得到最新Master,再更新ZK中的数据。


4、通知Proxy更新

主要更新Proxy内存中Master一些信息


之所以需要分2步,而不是像Slots迁移那样做到自动根据状态切换,可能是想降低系统复杂度,因为这块不是系统核心流程,并且是后台操作,多点下鼠标而已。


二、自动切主

Sentinel也会在某个Group的Master挂掉的时候自动切换Master,在Dashboard启动时会调用Topom::Start,后者调用rewatchSentinels

s.rewatchSentinels(ctx.sentinel.Servers)

这个方法前面已经分析过,主要订阅Sentinel发送的切主消息,收到相关消息后更新Zk上相关信息,就可以做到主、从自动切换了。






以上是关于Codis源码分析之Sentinel的主要内容,如果未能解决你的问题,请参考以下文章

源码分析 Sentinel 之 Dubbo 适配原理

4.Sentinel源码分析— Sentinel是如何做到降级的?

redis集群之sentinel

Codis3.2集群HA高可用方案

Redis Sentinel 源码分析 - Sentinel的初始化

Redis Sentinel 源码分析 - Sentinel 选举机制和主备倒换