grpc reslover源码分析

Posted golang算法架构leetcode技术php

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了grpc reslover源码分析相关的知识,希望对你有一定的参考价值。

如何将我们选择的服务解析方式应用到grpc的连接建立中去?

grpc的resolver,就是帮我们解决这个问题的

1.程序启动时,客户端是如何从一个域名/服务名,获取到其对应的实例ip,然后与之建立连接的呢?

2.运行过程中,如果后端的实例挂了,grpc如何感知到,并重新建立连接呢?

使用grpc的时候,首先要做的就是调用Dial或DialContext函数来初始化一个clientConn对象,而resolver是这个连接对象的一个重要的成员,所以我们首先看一看clientConn对象创建过程中,resolver是怎么设置进去的。

cc := &ClientConn{ target: target, csMgr: &connectivityStateManager{}, conns: make(map[*addrConn]struct{}), dopts: defaultDialOptions(), blockingpicker: newPickerWrapper(), czData: new(channelzData), firstResolveEvent: grpcsync.NewEvent(), }
cc.parsedTarget = grpcutil.ParseTarget(cc.target)resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme)
func ParseTarget(target string) (ret resolver.Target) { var ok bool ret.Scheme, ret.Endpoint, ok = split2(target, "://") if !ok { return resolver.Target{Endpoint: target} } ret.Authority, ret.Endpoint, ok = split2(ret.Endpoint, "/") if !ok { return resolver.Target{Endpoint: target} } return ret}
func (cc *ClientConn) getResolver(scheme string) resolver.Builder { for _, rb := range cc.dopts.resolvers { if scheme == rb.Scheme() { return rb } } return resolver.Get(scheme)}

 

func Get(scheme string) Builder { if b, ok := m[scheme]; ok { return b } return nil}

Get函数是通过m这个map,去查找有没有scheme对应的resolver的builder,那么m这个map是什么时候插入的值呢?这个在resolver的Register函数里:

func Register(b Builder) {m[b.Scheme()] = b}

那么谁会去调用这个Register函数,向map中写入resolver呢?


grpc实现了一个默认的解析器,也就是"passthrough",这个看名字就理解了,就是透传,所谓透传就是,什么都不做,那么什么时候需要透传呢?当你调用DialContext的时候,如果传入的target本身就是一个ip+port,这个时候,自然就不需要再解析什么了。那么"passthrough"对应的这个默认的解析器是什么时候注册到m这个map中的呢?这个调用在passthrough包的init函数里

func init() {resolver.Register(&passthroughBuilder{})}

具体路径是internal/resolver/passthrough/passthrough.go

在clientconn.go里面引入

 _ "google.golang.org/grpc/internal/resolver/dns" // To register dns resolver. _ "google.golang.org/grpc/internal/resolver/passthrough" // To register passthrough resolver. _ "google.golang.org/grpc/internal/resolver/unix" // To register unix resolver.

这三个包对应3个默认实现的resolver

% cd ~/go/pkg/mod/google.golang.org/grpc@v1.37.0/internal/resolver
 % tree.|____passthrough| |____passthrough.go|____config_selector.go|____unix| |____unix.go|____dns| |____dns_resolver.go| |____go113.go| |____dns_resolver_test.go|____config_selector_test.go

gRPC client LB 配合 Headless Service


可以自己实现一个dnsresolver

package resolver
import ( "context" "fmt" "net" "sync"
"google.golang.org/grpc/resolver")
func init() { resolver.Register(NewBuilder())}
type mydnsResolver struct { domain string port string address map[resolver.Address]struct{} ctx context.Context cancel context.CancelFunc cc resolver.ClientConn wg sync.WaitGroup rn chan struct{}}
// ResolveNow resolves immediatelyfunc (mr *mydnsResolver) ResolveNow(resolver.ResolveNowOptions) { select { case mr.rn <- struct{}{}: default: }}
// Close stops resolvingfunc (mr *mydnsResolver) Close() { mr.cancel() mr.wg.Wait()}func (mr *mydnsResolver) watcher() { defer func() { if err := recover(); err != nil { fmt.Println(err) } }() defer mr.wg.Done()
for { select { case <-mr.ctx.Done(): return case <-mr.rn: } result, err := mr.resolveByHttpDNS() if err != nil || len(result) == 0 { continue } mr.cc.UpdateState(resolver.State{Addresses: result}) }}
func (mr *mydnsResolver) resolveByHttpDNS() ([]resolver.Address, error) { var items []string = make([]string, 0, 4)
//这里实现通过向http://myself.dns.xyz发送get请求获取实例ip列表,并存入items中
var addresses = make([]resolver.Address, 0, len(items)) for _, v := range items { addr := net.JoinHostPort(v, mr.port) a := resolver.Address{ Addr: addr, ServerName: addr, // same as addr Type: resolver.Backend, } addresses = append(addresses, a) }
return addresses, nil
}
type mydnsBuilder struct {}
func NewBuilder() resolver.Builder { return &mydnsBuilder{}}
// Scheme for mydnsfunc (mb *mydnsBuilder) Scheme() string { return "mydns"}
// Buildfunc (mb *mydnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) (resolver.Resolver, error) { host, port, err := net.SplitHostPort(target.Endpoint) if err != nil { host = target.Endpoint port = "80" }
ctx, cancel := context.WithCancel(context.Background()) mr := &mydnsResolver{ domain: host, port: port, cc: cc, rn: make(chan struct{}, 1), address: make(map[resolver.Address]struct{}), } mr.ctx, mr.cancel = ctx, cancel
mr.wg.Add(1) go mr.watcher()
mr.ResolveNow(resolver.ResolveNowOptions{}) return mr, nil}

当然也可以基于etcd来实现

package balancer
import ( "context" "log" "strings" "time"
"github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/mvcc/mvccpb" "google.golang.org/grpc/resolver")
const schema = "wonamingv3"
var cli *clientv3.Client
type etcdResolver struct { rawAddr string cc resolver.ClientConn}
// NewResolver initialize an etcd clientfunc NewResolver(etcdAddr string) resolver.Builder { return &etcdResolver{rawAddr: etcdAddr}}
func (r *etcdResolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) { var err error
if cli == nil { cli, err = clientv3.New(clientv3.Config{ Endpoints: strings.Split(r.rawAddr, ";"), DialTimeout: 15 * time.Second, }) if err != nil { return nil, err } }
r.cc = cc
go r.watch("/" + target.Scheme + "/" + target.Endpoint + "/")
return r, nil}
func (r etcdResolver) Scheme() string { return schema}
func (r etcdResolver) ResolveNow(rn resolver.ResolveNowOption) { log.Println("ResolveNow") // TODO check}
// Close closes the resolver.func (r etcdResolver) Close() { log.Println("Close")}
func (r *etcdResolver) watch(keyPrefix string) { var addrList []resolver.Address
getResp, err := cli.Get(context.Background(), keyPrefix, clientv3.WithPrefix()) if err != nil { log.Println(err) } else { for i := range getResp.Kvs { addrList = append(addrList, resolver.Address{Addr: strings.TrimPrefix(string(getResp.Kvs[i].Key), keyPrefix)}) } }
r.cc.NewAddress(addrList)
rch := cli.Watch(context.Background(), keyPrefix, clientv3.WithPrefix()) for n := range rch { for _, ev := range n.Events { addr := strings.TrimPrefix(string(ev.Kv.Key), keyPrefix) switch ev.Type { case mvccpb.PUT: if !exist(addrList, addr) { addrList = append(addrList, resolver.Address{Addr: addr}) r.cc.NewAddress(addrList) } case mvccpb.DELETE: if s, ok := remove(addrList, addr); ok { addrList = s r.cc.NewAddress(addrList) } } //log.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value) } }}
func exist(l []resolver.Address, addr string) bool { for i := range l { if l[i].Addr == addr { return true } } return false}
func remove(s []resolver.Address, addr string) ([]resolver.Address, bool) { for i := range s { if s[i].Addr == addr { s[i] = s[len(s)-1] return s[:len(s)-1], true } } return nil, false}

基于consul实现

package consul
import ( "errors" "fmt" "regexp" "sync"
"github.com/hashicorp/consul/api" "google.golang.org/grpc/resolver")
const ( defaultPort = "8500")
var ( errMissingAddr = errors.New("consul resolver: missing address")
errAddrMisMatch = errors.New("consul resolver: invalied uri")
errEndsWithColon = errors.New("consul resolver: missing port after port-separator colon")
regexConsul, _ = regexp.Compile("^([A-z0-9.]+)(:[0-9]{1,5})?/([A-z_]+)$"))
func Init() { fmt.Printf("calling consul init\n") resolver.Register(NewBuilder())}
type consulBuilder struct {}
type consulResolver struct { address string wg sync.WaitGroup cc resolver.ClientConn name string disableServiceConfig bool lastIndex uint64}
func NewBuilder() resolver.Builder { return &consulBuilder{}}
func (cb *consulBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {
fmt.Printf("calling consul build\n") fmt.Printf("target: %v\n", target) host, port, name, err := parseTarget(fmt.Sprintf("%s/%s", target.Authority, target.Endpoint)) if err != nil { return nil, err }
cr := &consulResolver{ address: fmt.Sprintf("%s%s", host, port), name: name, cc: cc, disableServiceConfig: opts.DisableServiceConfig, lastIndex: 0, }
cr.wg.Add(1) go cr.watcher() return cr, nil
}
func (cr *consulResolver) watcher() { fmt.Printf("calling consul watcher\n") config := api.DefaultConfig() config.Address = cr.address client, err := api.NewClient(config) if err != nil { fmt.Printf("error create consul client: %v\n", err) return }
for { services, metainfo, err := client.Health().Service(cr.name, cr.name, true, &api.QueryOptions{WaitIndex: cr.lastIndex}) if err != nil { fmt.Printf("error retrieving instances from Consul: %v", err) }
cr.lastIndex = metainfo.LastIndex var newAddrs []resolver.Address for _, service := range services { addr := fmt.Sprintf("%v:%v", service.Service.Address, service.Service.Port) newAddrs = append(newAddrs, resolver.Address{Addr: addr}) } fmt.Printf("adding service addrs\n") fmt.Printf("newAddrs: %v\n", newAddrs) cr.cc.NewAddress(newAddrs) cr.cc.NewServiceConfig(cr.name) }
}
func (cb *consulBuilder) Scheme() string { return "consul"}
func (cr *consulResolver) ResolveNow(opt resolver.ResolveNowOption) {}
func (cr *consulResolver) Close() {}
func parseTarget(target string) (host, port, name string, err error) {
fmt.Printf("target uri: %v\n", target) if target == "" { return "", "", "", errMissingAddr }
if !regexConsul.MatchString(target) { return "", "", "", errAddrMisMatch }
groups := regexConsul.FindStringSubmatch(target) host = groups[1] port = groups[2] name = groups[3] if port == "" { port = defaultPort } return host, port, name, nil}


以上是关于grpc reslover源码分析的主要内容,如果未能解决你的问题,请参考以下文章

grpc-go客户端源码分析

grpc client 源码分析

GRPC框架源码分析

grpc Go Client 源码分析

gRPC-go源码:连接管理

grpc源码分析之域名解析