Codis源码分析之Sentinel
Posted 程序员升级之路
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Codis源码分析之Sentinel相关的知识,希望对你有一定的参考价值。
Sentinel是Redis官方的高可用方案,支持静态配置和运行时动态配置,Sentinel本身的机制不是本文重点,有兴趣可以直达官网:
https://redis.io/topics/sentinel
Codis也集成了Sentinel的方案,下面从如何添加Sentinel及Sentiel是如何工作的角度介绍整体流程。
一、添加Sentinel
在Fe中添加Sentinel需要分2步,先是添加:
然后是Sync
添加对应入口为Topom::AddSentinel
func (s *Topom) AddSentinel(addr string) error {
//
ctx, err := s.newContext()
if err != nil {
return err
}
p := ctx.sentinel
for _, x := range p.Servers {
if x == addr {
return errors.Errorf("sentinel-[%s] already exists", addr)
}
}
sentinel := redis.NewSentinel(s.config.ProductName, s.config.ProductAuth)
if err := sentinel.FlushConfig(addr, s.config.SentinelClientTimeout.Duration()); err != nil {
return err
}
p.Servers = append(p.Servers, addr)
p.OutOfSync = true
//更新Zk上保存数据
return s.storeUpdateSentinel(p)
}
可以看到在做完一些基本的检测后,调用storeUpdateSentinel直接更新Zk中的数据,没有其它逻辑。
再看Sync的逻辑,对应入口为Topom::ResyncSentinels
func (s *Topom) ResyncSentinels() error {
p := ctx.sentinel
p.OutOfSync = true
if err := s.storeUpdateSentinel(p); err != nil {
return err
}
config := &redis.MonitorConfig{
Quorum: s.config.SentinelQuorum,
ParallelSyncs: s.config.SentinelParallelSyncs,
DownAfter: s.config.SentinelDownAfter.Duration(),
FailoverTimeout: s.config.SentinelFailoverTimeout.Duration(),
NotificationScript: s.config.SentinelNotificationScript,
ClientReconfigScript: s.config.SentinelClientReconfigScript,
}
sentinel := redis.NewSentinel(s.config.ProductName, s.config.ProductAuth)
//移除所有Master
if err := sentinel.RemoveGroupsAll(p.Servers, s.config.SentinelClientTimeout.Duration()); err != nil {
log.WarnErrorf(err, "remove sentinels failed")
}
//监听Group
if err := sentinel.MonitorGroups(p.Servers, s.config.SentinelClientTimeout.Duration(), config, ctx.getGroupMasters()); err != nil {
log.WarnErrorf(err, "resync sentinels failed")
return err
}
//设置Group Master
s.rewatchSentinels(p.Servers)
var fut sync2.Future
for _, p := range ctx.proxy {
fut.Add()
go func(p *models.Proxy) {
//通知Proxy更新
err := s.newProxyClient(p).SetSentinels(ctx.sentinel)
if err != nil {
log.ErrorErrorf(err, "proxy-[%s] resync sentinel failed", p.Token)
}
fut.Done(p.Token, err)
}(p)
}
for t, v := range fut.Wait() {
switch err := v.(type) {
case error:
if err != nil {
return errors.Errorf("proxy-[%s] sentinel failed", t)
}
}
}
p.OutOfSync = false
return s.storeUpdateSentinel(p)
}
重点看注释的几段代码:
1、移除所有Master
对应代码为:
sentinel.RemoveGroupsAll
再跟进去:
func (s *Sentinel) RemoveGroupsAll(sentinels []string, timeout time.Duration) error {
//
for i := range sentinels {
go func(sentinel string) {
err := s.removeGroupsAllDispatch(cntx, sentinel, timeout)
if err != nil {
s.errorf(err, "sentinel-[%s] remove failed", sentinel)
}
results <- err
}(sentinels[i])
}
//
}
可以看到,重点是调用removeGroupsAllDispatch:
func (s *Sentinel) removeGroupsAllDispatch(ctx context.Context, sentinel string, timeout time.Duration) error {
var err = s.dispatch(ctx, sentinel, timeout, func(c *Client) error {
masters, err := s.mastersCommand(c)
if err != nil {
return err
}
var names []string
for gid := range masters {
names = append(names, s.NodeName(gid))
}
return s.removeCommand(c, names)
})
//
return nil
}
先调用mastersCommand得到Master,然后调用removeCommand,mastersCommand调用的是Sentinel masters命令:
values, err := redigo.Values(client.Do("SENTINEL", "masters"))
removeCommand调用SENTINEL remove移除对集群的监控
client.Send("SENTINEL", "remove", name)
2、监听Group
sentinel.MonitorGroups(p.Servers, s.config.SentinelClientTimeout.Duration(), config, ctx.getGroupMasters());
跟进去
func (s *Sentinel) MonitorGroups(sentinels []string, timeout time.Duration, config *MonitorConfig, groups map[int]string) error {
cntx, cancel := context.WithTimeout(s.Context, timeout)
defer cancel()
resolve := make(map[int]*net.TCPAddr)
var exit = make(chan error, 1)
go func() (err error) {
for gid, addr := range groups {
if err := cntx.Err(); err != nil {
return errors.Trace(err)
}
tcpAddr, err := net.ResolveTCPAddr("tcp", addr)
resolve[gid] = tcpAddr
}
return nil
}()
timeout += time.Second * 5
results := make(chan error, len(sentinels))
for i := range sentinels {
go func(sentinel string) {
err := s.monitorGroupsDispatch(cntx, sentinel, timeout, config, resolve)
if err != nil {
s.errorf(err, "sentinel-[%s] monitor failed", sentinel)
}
results <- err
}(sentinels[i])
}
}
//开启监控
go func() {
for gid, tcpAddr := range groups {
var ip, port = tcpAddr.IP.String(), tcpAddr.Port
client.Send("SENTINEL", "monitor", s.NodeName(gid), ip, port, config.Quorum)
}
if len(groups) != 0 {
client.Flush()
}
}()
//设置参数
go func() {
for gid := range groups {
var args = []interface{}{"set", s.NodeName(gid)}
if config.ParallelSyncs != 0 {
args = append(args, "parallel-syncs", config.ParallelSyncs)
}
if config.DownAfter != 0 {
args = append(args, "down-after-milliseconds", int(config.DownAfter/time.Millisecond))
}
if config.FailoverTimeout != 0 {
args = append(args, "failover-timeout", int(config.FailoverTimeout/time.Millisecond))
}
if s.Auth != "" {
args = append(args, "auth-pass", s.Auth)
}
if config.NotificationScript != "" {
args = append(args, "notification-script", config.NotificationScript)
}
if config.ClientReconfigScript != "" {
args = append(args, "client-reconfig-script", config.ClientReconfigScript)
}
client.Send("SENTINEL", args...)
}
if len(groups) != 0 {
client.Flush()
}
}()
主要做了2件事情况,一是调用Sentinel monitor开启对Group监控,二是设置各项参数,这些都在Codis配置文件中有对应配置项,这里不详述。
3、设置Group Master
s.rewatchSentinels(p.Servers)
进入函数看具体逻辑:
func (s *Topom) rewatchSentinels(servers []string) {
s.ha.monitor = redis.NewSentinel(s.config.ProductName, s.config.ProductAuth)
go func(p *redis.Sentinel) {
var trigger = make(chan struct{}, 1)
delayUntil := func(deadline time.Time) {
for !p.IsCanceled() {
var d = deadline.Sub(time.Now())
if d <= 0 {
return
}
time.Sleep(math2.MinDuration(d, time.Second))
}
}
go func() {
defer close(trigger)
callback := func() {
select {
case trigger <- struct{}{}:
default:
}
}
for !p.IsCanceled() {
timeout := time.Minute * 15
retryAt := time.Now().Add(time.Second * 10)
//订阅切主信息
if !p.Subscribe(servers, timeout, callback) {
delayUntil(retryAt)
} else {
callback()
}
}
}()
go func() {
for range trigger {
var success int
for i := 0; i != 10 && !p.IsCanceled() && success != 2; i++ {
timeout := time.Second * 5
//得到最新的Master
masters, err := p.Masters(servers, timeout)
if err != nil {
log.WarnErrorf(err, "fetch group masters failed")
} else {
if !p.IsCanceled() {
//切主
s.SwitchMasters(masters)
}
success += 1
}
delayUntil(time.Now().Add(time.Second * 5))
}
}
}()
}(s.ha.monitor)
先调用Subscribe订阅选主消息:
p.Subscribe(servers, timeout, callback)
跟进去可以看实现:
var channels = []interface{}{"+switch-master"}
go func() {
client.Send("SUBSCRIBE", channels...)
client.Flush()
}()
在订阅之后调用p.Masters,即SENTINEL masters得到最新的Master:
最后更新Master,这里的逻辑主要是更新Zk里的信息和内存中的对象。
4、通知Proxy更新
err := s.newProxyClient(p).SetSentinels(ctx.sentinel)
s.ha.servers = servers
s.rewatchSentinels(s.ha.servers)
这里主要更新ha里的信息,然后调用rewatchSentinels来更新主Redis信息,逻辑和上面rewatchSentinels的逻辑一样,没看明白这里为什么还要来1次,可能只是为了让Proxy有机会更新内存中对象最新的状态。
总结下,Sync的逻辑如下:
1、移除现有Master
先调用SENTINEL masters得到集群信息,再针对集群调用SENTINEL remove移除监控。
2、重新监听Group
通过调用Sentinel monitor完成。
3、重新设置Group Master
订阅切主通道消息,然后SENTINEL masters得到最新Master,再更新ZK中的数据。
4、通知Proxy更新
主要更新Proxy内存中Master一些信息
之所以需要分2步,而不是像Slots迁移那样做到自动根据状态切换,可能是想降低系统复杂度,因为这块不是系统核心流程,并且是后台操作,多点下鼠标而已。
二、自动切主
Sentinel也会在某个Group的Master挂掉的时候自动切换Master,在Dashboard启动时会调用Topom::Start,后者调用rewatchSentinels
s.rewatchSentinels(ctx.sentinel.Servers)
这个方法前面已经分析过,主要订阅Sentinel发送的切主消息,收到相关消息后更新Zk上相关信息,就可以做到主、从自动切换了。
以上是关于Codis源码分析之Sentinel的主要内容,如果未能解决你的问题,请参考以下文章
4.Sentinel源码分析— Sentinel是如何做到降级的?