grpc reslover源码分析
Posted golang算法架构leetcode技术php
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了grpc reslover源码分析相关的知识,希望对你有一定的参考价值。
如何将我们选择的服务解析方式应用到grpc的连接建立中去?
grpc的resolver,就是帮我们解决这个问题的
1.程序启动时,客户端是如何从一个域名/服务名,获取到其对应的实例ip,然后与之建立连接的呢?
2.运行过程中,如果后端的实例挂了,grpc如何感知到,并重新建立连接呢?
使用grpc的时候,首先要做的就是调用Dial或DialContext函数来初始化一个clientConn对象,而resolver是这个连接对象的一个重要的成员,所以我们首先看一看clientConn对象创建过程中,resolver是怎么设置进去的。
cc := &ClientConn{
target: target,
csMgr: &connectivityStateManager{},
conns: make(map[*addrConn]struct{}),
dopts: defaultDialOptions(),
blockingpicker: newPickerWrapper(),
czData: new(channelzData),
firstResolveEvent: grpcsync.NewEvent(),
}
grpcutil.ParseTarget(cc.target) =
resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme)
func ParseTarget(target string) (ret resolver.Target) {
var ok bool
ret.Scheme, ret.Endpoint, ok = split2(target, "://")
if !ok {
return resolver.Target{Endpoint: target}
}
ret.Authority, ret.Endpoint, ok = split2(ret.Endpoint, "/")
if !ok {
return resolver.Target{Endpoint: target}
}
return ret
}
func (cc *ClientConn) getResolver(scheme string) resolver.Builder {
for _, rb := range cc.dopts.resolvers {
if scheme == rb.Scheme() {
return rb
}
}
return resolver.Get(scheme)
}
func Get(scheme string) Builder {
if b, ok := m[scheme]; ok {
return b
}
return nil
}
Get函数是通过m这个map,去查找有没有scheme对应的resolver的builder,那么m这个map是什么时候插入的值呢?这个在resolver的Register函数里:
func Register(b Builder) {
m[b.Scheme()] = b
}
那么谁会去调用这个Register函数,向map中写入resolver呢?
grpc实现了一个默认的解析器,也就是"passthrough",这个看名字就理解了,就是透传,所谓透传就是,什么都不做,那么什么时候需要透传呢?当你调用DialContext的时候,如果传入的target本身就是一个ip+port,这个时候,自然就不需要再解析什么了。那么"passthrough"对应的这个默认的解析器是什么时候注册到m这个map中的呢?这个调用在passthrough包的init函数里
func init() {
resolver.Register(&passthroughBuilder{})
}
具体路径是internal/resolver/passthrough/passthrough.go
在clientconn.go里面引入
_ "google.golang.org/grpc/internal/resolver/dns" // To register dns resolver.
_ "google.golang.org/grpc/internal/resolver/passthrough" // To register passthrough resolver.
_ "google.golang.org/grpc/internal/resolver/unix" // To register unix resolver.
这三个包对应3个默认实现的resolver
% cd ~/go/pkg/mod/google.golang.org/grpc@v1.37.0/internal/resolver
% tree
.
|____passthrough
| |____passthrough.go
|____config_selector.go
|____unix
| |____unix.go
|____dns
| |____dns_resolver.go
| |____go113.go
| |____dns_resolver_test.go
|____config_selector_test.go
gRPC client LB 配合 Headless Service
可以自己实现一个dnsresolver
package resolver
import (
"context"
"fmt"
"net"
"sync"
"google.golang.org/grpc/resolver"
)
func init() {
resolver.Register(NewBuilder())
}
type mydnsResolver struct {
domain string
port string
address map[resolver.Address]struct{}
ctx context.Context
cancel context.CancelFunc
cc resolver.ClientConn
wg sync.WaitGroup
rn chan struct{}
}
// ResolveNow resolves immediately
func (mr *mydnsResolver) ResolveNow(resolver.ResolveNowOptions) {
select {
case mr.rn <- struct{}{}:
default:
}
}
// Close stops resolving
func (mr *mydnsResolver) Close() {
mr.cancel()
mr.wg.Wait()
}
func (mr *mydnsResolver) watcher() {
defer func() {
if err := recover(); err != nil {
fmt.Println(err)
}
}()
defer mr.wg.Done()
for {
select {
case <-mr.ctx.Done():
return
case <-mr.rn:
}
result, err := mr.resolveByHttpDNS()
if err != nil || len(result) == 0 {
continue
}
mr.cc.UpdateState(resolver.State{Addresses: result})
}
}
func (mr *mydnsResolver) resolveByHttpDNS() ([]resolver.Address, error) {
var items []string = make([]string, 0, 4)
//这里实现通过向http://myself.dns.xyz发送get请求获取实例ip列表,并存入items中
var addresses = make([]resolver.Address, 0, len(items))
for _, v := range items {
addr := net.JoinHostPort(v, mr.port)
a := resolver.Address{
Addr: addr,
ServerName: addr, // same as addr
Type: resolver.Backend,
}
addresses = append(addresses, a)
}
return addresses, nil
}
type mydnsBuilder struct {
}
func NewBuilder() resolver.Builder {
return &mydnsBuilder{}
}
// Scheme for mydns
func (mb *mydnsBuilder) Scheme() string {
return "mydns"
}
// Build
func (mb *mydnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) (resolver.Resolver, error) {
host, port, err := net.SplitHostPort(target.Endpoint)
if err != nil {
host = target.Endpoint
port = "80"
}
ctx, cancel := context.WithCancel(context.Background())
mr := &mydnsResolver{
domain: host,
port: port,
cc: cc,
rn: make(chan struct{}, 1),
address: make(map[resolver.Address]struct{}),
}
mr.ctx, mr.cancel = ctx, cancel
mr.wg.Add(1)
go mr.watcher()
mr.ResolveNow(resolver.ResolveNowOptions{})
return mr, nil
}
当然也可以基于etcd来实现
package balancer
import (
"context"
"log"
"strings"
"time"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"
"google.golang.org/grpc/resolver"
)
const schema = "wonamingv3"
var cli *clientv3.Client
type etcdResolver struct {
rawAddr string
cc resolver.ClientConn
}
// NewResolver initialize an etcd client
func NewResolver(etcdAddr string) resolver.Builder {
return &etcdResolver{rawAddr: etcdAddr}
}
func (r *etcdResolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {
var err error
if cli == nil {
cli, err = clientv3.New(clientv3.Config{
Endpoints: strings.Split(r.rawAddr, ";"),
DialTimeout: 15 * time.Second,
})
if err != nil {
return nil, err
}
}
r.cc = cc
go r.watch("/" + target.Scheme + "/" + target.Endpoint + "/")
return r, nil
}
func (r etcdResolver) Scheme() string {
return schema
}
func (r etcdResolver) ResolveNow(rn resolver.ResolveNowOption) {
log.Println("ResolveNow") // TODO check
}
// Close closes the resolver.
func (r etcdResolver) Close() {
log.Println("Close")
}
func (r *etcdResolver) watch(keyPrefix string) {
var addrList []resolver.Address
getResp, err := cli.Get(context.Background(), keyPrefix, clientv3.WithPrefix())
if err != nil {
log.Println(err)
} else {
for i := range getResp.Kvs {
addrList = append(addrList, resolver.Address{Addr: strings.TrimPrefix(string(getResp.Kvs[i].Key), keyPrefix)})
}
}
r.cc.NewAddress(addrList)
rch := cli.Watch(context.Background(), keyPrefix, clientv3.WithPrefix())
for n := range rch {
for _, ev := range n.Events {
addr := strings.TrimPrefix(string(ev.Kv.Key), keyPrefix)
switch ev.Type {
case mvccpb.PUT:
if !exist(addrList, addr) {
addrList = append(addrList, resolver.Address{Addr: addr})
r.cc.NewAddress(addrList)
}
case mvccpb.DELETE:
if s, ok := remove(addrList, addr); ok {
addrList = s
r.cc.NewAddress(addrList)
}
}
//log.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
}
}
}
func exist(l []resolver.Address, addr string) bool {
for i := range l {
if l[i].Addr == addr {
return true
}
}
return false
}
func remove(s []resolver.Address, addr string) ([]resolver.Address, bool) {
for i := range s {
if s[i].Addr == addr {
s[i] = s[len(s)-1]
return s[:len(s)-1], true
}
}
return nil, false
}
基于consul实现
package consul
import (
"errors"
"fmt"
"regexp"
"sync"
"github.com/hashicorp/consul/api"
"google.golang.org/grpc/resolver"
)
const (
defaultPort = "8500"
)
var (
errMissingAddr = errors.New("consul resolver: missing address")
errAddrMisMatch = errors.New("consul resolver: invalied uri")
errEndsWithColon = errors.New("consul resolver: missing port after port-separator colon")
regexConsul, _ = regexp.Compile("^([A-z0-9.]+)(:[0-9]{1,5})?/([A-z_]+)$")
)
func Init() {
fmt.Printf("calling consul init\n")
resolver.Register(NewBuilder())
}
type consulBuilder struct {
}
type consulResolver struct {
address string
wg sync.WaitGroup
cc resolver.ClientConn
name string
disableServiceConfig bool
lastIndex uint64
}
func NewBuilder() resolver.Builder {
return &consulBuilder{}
}
func (cb *consulBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {
fmt.Printf("calling consul build\n")
fmt.Printf("target: %v\n", target)
host, port, name, err := parseTarget(fmt.Sprintf("%s/%s", target.Authority, target.Endpoint))
if err != nil {
return nil, err
}
cr := &consulResolver{
address: fmt.Sprintf("%s%s", host, port),
name: name,
cc: cc,
disableServiceConfig: opts.DisableServiceConfig,
lastIndex: 0,
}
cr.wg.Add(1)
go cr.watcher()
return cr, nil
}
func (cr *consulResolver) watcher() {
fmt.Printf("calling consul watcher\n")
config := api.DefaultConfig()
config.Address = cr.address
client, err := api.NewClient(config)
if err != nil {
fmt.Printf("error create consul client: %v\n", err)
return
}
for {
services, metainfo, err := client.Health().Service(cr.name, cr.name, true, &api.QueryOptions{WaitIndex: cr.lastIndex})
if err != nil {
fmt.Printf("error retrieving instances from Consul: %v", err)
}
cr.lastIndex = metainfo.LastIndex
var newAddrs []resolver.Address
for _, service := range services {
addr := fmt.Sprintf("%v:%v", service.Service.Address, service.Service.Port)
newAddrs = append(newAddrs, resolver.Address{Addr: addr})
}
fmt.Printf("adding service addrs\n")
fmt.Printf("newAddrs: %v\n", newAddrs)
cr.cc.NewAddress(newAddrs)
cr.cc.NewServiceConfig(cr.name)
}
}
func (cb *consulBuilder) Scheme() string {
return "consul"
}
func (cr *consulResolver) ResolveNow(opt resolver.ResolveNowOption) {
}
func (cr *consulResolver) Close() {
}
func parseTarget(target string) (host, port, name string, err error) {
fmt.Printf("target uri: %v\n", target)
if target == "" {
return "", "", "", errMissingAddr
}
if !regexConsul.MatchString(target) {
return "", "", "", errAddrMisMatch
}
groups := regexConsul.FindStringSubmatch(target)
host = groups[1]
port = groups[2]
name = groups[3]
if port == "" {
port = defaultPort
}
return host, port, name, nil
}
以上是关于grpc reslover源码分析的主要内容,如果未能解决你的问题,请参考以下文章