etcd_selector.go

Posted 张伯雨

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了etcd_selector.go相关的知识,希望对你有一定的参考价值。

package clientselector

import (
    "errors"
    "math/rand"
    "net"
    "net/rpc"
    "net/url"
    "strconv"
    "strings"
    "time"

    "golang.org/x/net/context"

    "github.com/coreos/etcd/client"
    "github.com/smallnest/rpcx"
)

// EtcdClientSelector is used to select a rpc server from etcd.
//etcd 负载均衡器 结构体   
type EtcdClientSelector struct {
    EtcdServers        []string   //etcd 客户端连接地址列表
    KeysAPI            client.KeysAPI  //etcd 连接客户端
    ticker             *time.Ticker    //周期执行器
    sessionTimeout     time.Duration   //连接超时时间
    BasePath           string //should endwith serviceName   服务路径或者服务名称
    Servers            []string  //具体微服务地址列表
    Group              string   //组名
    clientAndServer    map[string]*rpc.Client  //map key 就提微服务地址  value 客户端连接 
    metadata           map[string]string  //
    Latitude           float64     //纬度
    Longitude          float64     //经度 
    WeightedServers    []*Weighted  //权重
    SelectMode         rpcx.SelectMode //选择器
    dailTimeout        time.Duration  //连接超时时间
    rnd                *rand.Rand    //序列号
    currentServer      int     //当前第几个服务
    len                int     //服务个数
    HashServiceAndArgs HashServiceAndArgs  //权重选择器
    Client             *rpcx.Client //rpcx客户端
}

// NewEtcdClientSelector creates a EtcdClientSelector
// 创建EtcdClientSelector结构体
func NewEtcdClientSelector(etcdServers []string, basePath string, sessionTimeout time.Duration, sm rpcx.SelectMode, dailTimeout time.Duration) *EtcdClientSelector {
    selector := &EtcdClientSelector{
        EtcdServers:     etcdServers,
        BasePath:        basePath,
        sessionTimeout:  sessionTimeout,
        SelectMode:      sm,
        dailTimeout:     dailTimeout,
        clientAndServer: make(map[string]*rpc.Client),
        metadata:        make(map[string]string),
        rnd:             rand.New(rand.NewSource(time.Now().UnixNano()))}

    selector.start()
    return selector
}

//SetClient set a Client in order that clientSelector can uses it
//客户端设置
func (s *EtcdClientSelector) SetClient(c *rpcx.Client) {
    s.Client = c
}
//SetSelectMode sets SelectMode
//设置重选算法
func (s *EtcdClientSelector) SetSelectMode(sm rpcx.SelectMode) {
    s.SelectMode = sm
}

//AllClients returns rpc.Clients to all servers
//直接返回微服务对应的直连客户端集合
func (s *EtcdClientSelector) AllClients(clientCodecFunc rpcx.ClientCodecFunc) []*rpc.Client {
    var clients []*rpc.Client
    for _, sv := range s.Servers {
        ss := strings.Split(sv, "@")
               //创建直连的客户端 http kcp (基于udp实现的协议)  或者https  以及golang支持的协议
        c, err := rpcx.NewDirectRPCClient(s.Client, clientCodecFunc, ss[0], ss[1], s.dailTimeout)
        if err == nil {
            clients = append(clients, c)
        }
    }
    return clients
}
//获取一个ETCD客户端
func (s *EtcdClientSelector) start() {
    cli, err := client.New(client.Config{
        Endpoints:               s.EtcdServers,
        Transport:               client.DefaultTransport,
        HeaderTimeoutPerRequest: s.sessionTimeout,
    })

    if err != nil {
        return
    }
    s.KeysAPI = client.NewKeysAPI(cli)
    s.pullServers()

    // s.ticker = time.NewTicker(s.sessionTimeout)
    // go func() {
    //     for range s.ticker.C {
    //         s.pullServers()
    //     }
    // }()

    go s.watch()
}

func (s *EtcdClientSelector) watch() {
    watcher := s.KeysAPI.Watcher(s.BasePath, &client.WatcherOptions{
        Recursive: true,
    })

    for {
        res, err := watcher.Next(context.Background())
        if err != nil {
            break
        }

        //services are changed, we pull service again instead of processing single node
        if res.Action == "expire" {
            s.pullServers()
            if !res.Node.Dir {
                // clientAndServer delete the invalid client connection
                removedServer := strings.TrimPrefix(res.Node.Key, s.BasePath+"/")
                delete(s.clientAndServer, removedServer)
            }
        } else if res.Action == "set" || res.Action == "update" {
            s.pullServers()
        } else if res.Action == "delete" {
            s.pullServers()
        }
    }
}
//
func (s *EtcdClientSelector) pullServers() {
//获取key对应的
    resp, err := s.KeysAPI.Get(context.TODO(), s.BasePath, &client.GetOptions{
        Recursive: true,
        Sort:      true,
    })
//
    if err == nil && resp.Node != nil {
        if len(resp.Node.Nodes) > 0 {
            var servers []string
            for _, n := range resp.Node.Nodes {
                servers = append(servers, strings.TrimPrefix(n.Key, s.BasePath+"/"))

            }
            s.Servers = servers

            s.createWeighted(resp.Node.Nodes)

            //set weight based on ICMP result
            if s.SelectMode == rpcx.WeightedICMP {
                for _, w := range s.WeightedServers {
                    server := w.Server.(string)
                    ss := strings.Split(server, "@")
                    host, _, _ := net.SplitHostPort(ss[1])
                    rtt, _ := Ping(host)
                    rtt = CalculateWeight(rtt)
                    w.Weight = rtt
                    w.EffectiveWeight = rtt
                }
            }

            s.len = len(s.Servers)

            if s.len > 0 {
                s.currentServer = s.currentServer % s.len
            }
        } else {
            // when the last instance is down, it should be deleted
            s.clientAndServer = map[string]*rpc.Client{}
        }

    }
}

func (s *EtcdClientSelector) createWeighted(nodes client.Nodes) {
    s.WeightedServers = make([]*Weighted, len(s.Servers))

    var inactiveServers []int

    for i, n := range nodes {
        key := strings.TrimPrefix(n.Key, s.BasePath+"/")
        s.WeightedServers[i] = &Weighted{Server: key, Weight: 1, EffectiveWeight: 1}
        s.metadata[key] = n.Value
        if v, err := url.ParseQuery(n.Value); err == nil {
            w := v.Get("weight")
            state := v.Get("state")
            group := v.Get("group")
            if (state != "" && state != "active") || (s.Group != group) {
                inactiveServers = append(inactiveServers, i)
            }

            if w != "" {
                if weight, err := strconv.Atoi(w); err == nil {
                    s.WeightedServers[i].Weight = weight
                    s.WeightedServers[i].EffectiveWeight = weight
                }
            }
        }
    }

    s.removeInactiveServers(inactiveServers)
}

func (s *EtcdClientSelector) removeInactiveServers(inactiveServers []int) {
    i := len(inactiveServers) - 1
    for ; i >= 0; i-- {
        k := inactiveServers[i]
        removedServer := s.Servers[k]
        s.Servers = append(s.Servers[0:k], s.Servers[k+1:]...)
        s.WeightedServers = append(s.WeightedServers[0:k], s.WeightedServers[k+1:]...)
        c := s.clientAndServer[removedServer]
        if c != nil {
            delete(s.clientAndServer, removedServer)
            c.Close() //close connection to inactive server
        }
    }
}

func (s *EtcdClientSelector) getCachedClient(server string, clientCodecFunc rpcx.ClientCodecFunc) (*rpc.Client, error) {
    c := s.clientAndServer[server]
    if c != nil {
        return c, nil
    }
    ss := strings.Split(server, "@") //
    c, err := rpcx.NewDirectRPCClient(s.Client, clientCodecFunc, ss[0], ss[1], s.dailTimeout)
    s.clientAndServer[server] = c
    return c, err
}

// Select returns a rpc client
func (s *EtcdClientSelector) Select(clientCodecFunc rpcx.ClientCodecFunc, options ...interface{}) (*rpc.Client, error) {
    if s.len == 0 {
        return nil, errors.New("No available service")
    }

    switch s.SelectMode {
    case rpcx.RandomSelect:
        s.currentServer = s.rnd.Intn(s.len)
        server := s.Servers[s.currentServer]
        return s.getCachedClient(server, clientCodecFunc)

    case rpcx.RoundRobin:
        s.currentServer = (s.currentServer + 1) % s.len //not use lock for performance so it is not precise even
        server := s.Servers[s.currentServer]
        return s.getCachedClient(server, clientCodecFunc)

    case rpcx.ConsistentHash:
        if s.HashServiceAndArgs == nil {
            s.HashServiceAndArgs = JumpConsistentHash
        }
        s.currentServer = s.HashServiceAndArgs(s.len, options)
        server := s.Servers[s.currentServer]
        return s.getCachedClient(server, clientCodecFunc)

    case rpcx.WeightedRoundRobin, rpcx.WeightedICMP:
        server := nextWeighted(s.WeightedServers).Server.(string)
        return s.getCachedClient(server, clientCodecFunc)

    case rpcx.Closest:
        closestServers := getClosestServer(s.Latitude, s.Longitude, s.metadata)
        selected := s.rnd.Intn(len(closestServers))
        return s.getCachedClient(closestServers[selected], clientCodecFunc)

    default:
        return nil, errors.New("not supported SelectMode: " + s.SelectMode.String())
    }
}




































以上是关于etcd_selector.go的主要内容,如果未能解决你的问题,请参考以下文章