小米出品——gRPC Name Resolver 原理及实践
Posted GoCN
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了小米出品——gRPC Name Resolver 原理及实践相关的知识,希望对你有一定的参考价值。
本文字数:5380 字
精读时间:12 分钟
也可在 6 分钟内完成速读
01
前言
02
实现自定义 Name Resolver
resolver.go
、
resolver_build.go
、
dail.go
。
ns # 自定义 resolver 包名
├── dial.go # 封装了 gRPC 包的 grpc.DialContext() 方法,严格来说 dail.go 不应该放在 ns 包下,本例中这么做只是为简化包布局,方便读者理解
├── resolver.go # 实现了 gRPC resolver 包 Resolver 接口的 nsResolver
└── resolver_builder.go # 实现了 gRPC resolver 包 ResolverBuilder 接口的 nsResolverBuilder
03
定义 nsResolver
resolver.go
:
package ns
import (
"context"
"encoding/json"
"fmt"
"strings"
"time"
"mypkg/internal/logz" // 私有日志包,基于 uber 开源的 zap 实现
sdk "mypkg/internal/soa-sdk" // 私有 ns sdk 包,封装了内部 soa 平台进行服务发现的 sdk
_ "google.golang.org/grpc"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
)
const (
// syncNSInterval 定义了从 NS 服务同步实例列表的周期
syncNSInterval = 1 * time.Second
)
// nsResolver 实现了 resolver.Resolver 接口
type nsResolver struct {
target resolver.Target
cc resolver.ClientConn
ctx context.Context
cancel context.CancelFunc
...
}
// watcher 轮询并更新指定 CalleeService 服务的实例变化
func (r *nsResolver) watcher() {
r.updateCC()
ticker := time.NewTicker(syncNSInterval)
for {
select {
// 当* nsResolver Close 时退出监听
case <-r.ctx.Done():
ticker.Stop()
return
case <-ticker.C:
// 调用* nsResolver.updagteCC() 方法,更新实例地址
r.updateCC()
}
}
}
// updateCC 更新 resolver.Resolver.ClientConn 配置
func (r *nsResolver) updateCC() {
// 从 NS 服务获取指定 target 的实例列表
instances, err := r.getInstances(r.target)
// 如果获取实例列表失败,或者实例列表为空,则不更新 resolver 中实例列表
if err != nil || len(instances.CalleeIns) == 0 {
logz.Warn("[mis] error retrieving instances from Mis", logz.Any("target", r.target), logz.Error(err))
return
}
...
// 组装实例列表 []resolver.Address
// resolver.Address 结构体表示 grpc server 端实例地址
var newAddrs []resolver.Address
for k := range instances.CalleeIns {
newAddrs = append(newAddrs, instances.CalleeIns)
}
...
// 更新实例列表
// grpc 底层 LB 组件对每个服务端实例创建一个 subConnection。并根据设定的 LB 策略,选择合适的 subConnection 处理某次 RPC 请求。
// 此处代码比较复杂,后续在 LB 相关原理文章中再做概述
r.cc.UpdateState(resolver.State{Addresses: newAddrs})
}
// ResolveNow 实现了 resolver.Resolver.ResolveNow 方法
func (*nsResolver) ResolveNow(o resolver.ResolveNowOption) {}
// Close 实现了 resolver.Resolver.Close 方法
func (r *nsResolver) Close() {
r.cancel()
}
// instances 包含调用方服务名、被调方服务名、被调方实例列表等数据
type instances struct {
callerService string
calleeService string
calleeIns []string
}
// getInstances 获取指定服务所有可用的实例列表
func (r *nsResolver) getInstances(target resolver.Target) (s *instances, e error) {
auths := strings.Split(target.Authority, "@")
// auths[0] 为 callerService 名,target.Endpoint 为 calleeService 名
// 通过自定义 sdk 从内部 NameServer 查询指定 calleeService 对应的实例列表
ins, e := sdk.GetInstances(auths[0], target.Endpoint)
if e != nil {
return nil, e
}
return &instances{
callerService: auths[0],
calleeService: target.Endpoint,
calleeIns: ins.Instances,
}, nil
}
04
定义 nsResolverBuilder
ns/resolver_builder.go
构建 nsResolver 时,我们参考
google.golang.org/grpc/resolver/dns/dns_resolver.go
源码,采用 Builder 设计模式:
package ns
import (
"context"
"fmt"
"google.golang.org/grpc/resolver"
)
// init 将定义好的 NS Builder 注册到 resolver 包中
func init() {
resolver.Register(NewBuilder())
}
// NewBuilder 构造 nsResolverBuilder
func NewBuilder() resolver.Builder {
return &nsResolverBuilder{}
}
// nsResolverBuilder 实现了 resolver.Builder 接口,用来构造定义好的 Resolver Bulder
type nsResolverBuilder struct{}
// URI 返回某个服务的统一资源描述符(URI),这个 URI 可以从 nsResolver 中查询实例列表
// URI 设计时可以遵循 RFC-3986(https://tools.ietf.org/html/rfc3986) 规范,
// 比如本例中 ns 格式为:ns://callerService:@calleeService
// 其中 ns 为协议名,callerService 为订阅方服务名(即主调方服务名),calleeService 为发布方服务名(即被调方服务名)
func URI(callerService, calleeService string) string {
return fmt.Sprintf("ns://%s:@%s", callerService, calleeService)
}
// Build 实现了 resolver.Builder.Build 方法
func (*nsResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {
ctx, cancel := context.WithCancel(context.Background())
r := &nsResolver{
target: target,
cc: cc,
ctx: ctx,
cancel: cancel,
}
// 启动协程,响应指定 Name 服务实例变化
go r.watcher()
return r, nil
}
// Scheme 实现了 resolver.Builder.Scheme 方法
// Scheme 方法定义了 ns resolver 的协议名
func (*nsResolverBuilder) Scheme() string {
return "ns"
}
05
封装 gRPC.Dial() 方法
nsResolver
nsResolverBuilder
后,我们还需要对
grpc.Dial()
方法进行封装,方便业务方适用。封装后
dial.go
代码如下所示(
严格来说
dial.go
不应该放在
ns
包中,本例中这么做只是为简化包布局,方便读者理解
)
:
package ns
// Dial 封装 `grpc.Dial()` 方法以供业务方代码初始化 *grpc.ClientConn。
// 业务方可使用此 Dial 方法基于主调方服务名、被调方服务名等参数构造 *grpc.ClientConn 实例,
// 随后可在业务代码中使用 *grpc.ClientConn 实例构造桩代码中生成的 grpcServiceClient 并发起 RPC 调用。
func Dial(callerService, calleeService string, dialOpts ...grpc.DialOption) (*grpc.ClientConn, error) {
// 根据 callerService 和 calleeService 构造对应的 URI
URI := ns.URI(callerService, calleeService)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 设置拨号配置
opts := []grpc.DialOption{
grpc.WithBlock(),
grpc.WithInsecure(),
}
dialOpts = append(dialOpts, dialOpts...)
// 调用 grpc.DialContext() 方法拨号
conn, err := grpc.DialContext(
ctx,
URI,
opts...,
)
if err != nil {
logz.Warn("did not connect", logz.Any("target", URI), logz.E(err))
return nil, err
}
return conn, err
}
06
gRPC resolver 原理
ns
中定义了两个 go 文件,
resolver.go
和
resolver_builder.go
。
-
前者是整个功能最核心的代码,通过自定义 nsResolver
将服务名解析成对应实例。 -
后者是采用 Builder 模式在包初始化时创建并注册构造 nsResover
的nsResolverBuilder
实例。当客户端通过Dial
方法对指定服务进行拨号时,grpc resolver 查找注册的 Builder 实例调用其Build()
方法构建自定义nsResolver
。
demo.pb
文件:
syntax = "proto3";
package demo;
service DemoService {
rpc SayHi(HiRequest) returns (HiResponse);
}
message HiRequest {
string name = 1;
}
message HiResponse {
string message = 1;
}
demo.pb.go
可能如下:
package demo
...
type DemoServiceClient interface {
SayHiOK(ctx context.Context, in *HiRequest, opts ...grpc.CallOption) (*HiResponse, error)
}
type demoServiceClient struct {
cc *grpc.ClientConn
}
// NewDemoServiceClient 业务代码中此方法来构造 *demoServiceClient 实例
func NewDemoServiceClient(cc *grpc.ClientConn) DemoServiceClient {
return &demoServiceClient{cc}
}
func (c *demoServiceClient) SayHiOK(ctx context.Context, in *HiRequest, opts ...grpc.CallOption) (*HiResponse, error) {
out := new(HiResponse)
err := c.cc.Invoke(ctx, "/proto.DemoService/SayHiOK", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
...
*grpc.ClientConn
再发起 RPC 调用代码如下:
import "mypkg/internal/ns"
...
// 使用上节中封装的 ns.Dial 方法构造 *grpc.ClientConn
conn, _ := ns.Dial("my-caller-service", "my-callee-service")
// 构造 *demoServiceClient
cli := demo.NewDemoServiceClient(conn)
// 使用 *demoServiceClient 发起 RPC 调用
res, _ := cli.SayHiOK(ctx, &proto.HiRequest{Name: "world"})
...
import "mypkg/internal/ns"
包后,在
ns/resolver_builder.go
的 init 阶段会通过
Register()
方法将
nsResolverBuilder
注册到 grpc 内部的一个全局 map 中:
// m 定义为一个全局 map,用于存放 [resolver 协议名 -> resolverBuilder] 键值对
var m = make(map[string]Builder)
// Register 方法将指定 [resolver 协议名 -> resolverBuilder] 键值对存入 map
func Register(b Builder) {
m[b.Scheme()] = b
}
// Get 方法根据传入的 resolver 协议名返回对应的 resolverBuilder
func Get(scheme string) Builder {
if b, ok := m[scheme]; ok {
return b
}
return nil
}
ns.Dial()
方法使用 callerService 和 calleeService 构造服务 URI,并使用此 URI 作为参数调用
grpc.DialContext()
方法,来构造
*grpc.ClientConn
实例。
grpc.DialContext()
方法接收三个参数:ctx、target、opts,
ns://my-caller-service:@my-callee-service
,其中
ns
为协议名。grpc 可通过协议名查表来获取对应的 resolverBuilder。opts:是一个变长参数,表示拨号配置选项。
grpc.DialContext()
内部逻辑比较复杂,我们挑重点讲:
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
// 构造 ClientConn 实例
cc := &ClientConn{
target: target,
csMgr: &connectivityStateManager{},
conns: make(map[*addrConn]struct{}),
dopts: defaultDialOptions(),
blockingpicker: newPickerWrapper(),
czData: new(channelzData),
firstResolveEvent: grpcsync.NewEvent(),
}
cc.retryThrottler.Store((*retryThrottler)(nil))
cc.ctx, cc.cancel = context.WithCancel(context.Background())
for _, opt := range opts {
opt.apply(&cc.dopts)
}
...
// 如果用户指定了 timeout 超时配置,那么初始化一个带超时的 ctx
// 如果 defer 阶段已超时,则抛出 j 错误
if cc.dopts.timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
defer cancel()
}
defer func() {
select {
case <-ctx.Done():
conn, err = nil, ctx.Err()
default:
}
}()
...
// Name Resolver 核心逻辑,初始化 resolverBuilder,代码中首先会判断下用户是否指定 resolverBuilder
// - 如果有指定 resolverBuilder,则直接使用此 resolverBuilder。
// - 如果用户没有指定 resolverBuilder,那么 grpc 做如下操作:
// - 通过 parseTarget 方法解析用户传入的 target,本例中即 `ns://my-caller-service:@my-callee-service`,获取 Scheme(协议名)、authority(包含 callerService、calleeService)。
// - 查询指定协议对应的 resolverBuilder。
if cc.dopts.resolverBuilder == nil {
// 解析用户传入的 target
cc.parsedTarget = parseTarget(cc.target)
// 通过协议名查表获取对应的 resolverBuilder
cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
// 如果表中没查到对应的 resolverBuilder,则使用默认协议查询对应的 resolverBuilder
// 默认协议为 `passthrough`,它会从用户解析的 target 中直接读取 endpoint 地址
if cc.dopts.resolverBuilder == nil {
cc.parsedTarget = resolver.Target{
Scheme: resolver.GetDefaultScheme(),
Endpoint: target,
}
cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
}
} else {
cc.parsedTarget = resolver.Target{Endpoint: target}
}
...
// 使用上面初始化的 resolverBuilder 构建 resolver
// 初始化 resolverWrapper
rWrapper, err := newCCResolverWrapper(cc)
if err != nil {
return nil, fmt.Errorf("failed to build resolver: %v", err)
}
cc.mu.Lock()
cc.resolverWrapper = rWrapper
cc.mu.Unlock()
// 如果客户端配置了 WithBlock option,则会轮询 ClientConn 状态,如果 ClientConn 就绪,则返回 ClientConn。
// 如果直到 ctx 超时或被 Cancel ClientConn 依然未就绪,则抛出 ctx.Err() 错误。
if cc.dopts.block {
for {
s := cc.GetState()
// 1. 如果 ClientConn 状态为 Ready 则返回此 ClientConn
// 2. 如果 ClientConn 状态并非 Ready,且用户配置了 FailOnNonTempDialError,当前 ClientConn 状态为 TransientFailure,且 lbPicker 尝试和服务端实例建立连接时产生错误。根据错误性质做如下处理:
// 2.1. 如果此错误是非临时性的错误,则抛出此错误
// 2.2. 如果此错误是临时性的错误,则继续轮询 ClientConn 状态,直至 ctx 超时或被外部 Cancel
if s == connectivity.Ready {
break
} else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {
if err = cc.blockingpicker.connectionError(); err != nil {
terr, ok := err.(interface {
Temporary() bool
})
if ok && !terr.Temporary() {
return nil, err
}
}
}
if !cc.WaitForStateChange(ctx, s) {
return nil, ctx.Err()
}
}
}
return cc, nil
}
newCCResolverWrapper()
方法内部实现:
func newCCResolverWrapper(cc *ClientConn) (*ccResolverWrapper, error) {
...
ccr := &ccResolverWrapper{
cc: cc,
addrCh: make(chan []resolver.Address, 1),
scCh: make(chan string, 1),
}
var err error
// rb.Build() 调用指定 resolveBuilder 的 Build 方法,本例中会执行我们定义的 nsResolverBuilder.Builder() 方法
ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, resolver.BuildOption{DisableServiceConfig: cc.dopts.disableServiceConfig})
if err != nil {
return nil, err
}
return ccr, nil
}
ns/resolver_builder.go
中已经给出了
nsResolverBuilder
实现,我们再看下
nsResolverBuilder.Builder()
方法内部逻辑:
package ns
// init 将定义好的 NS Builder 注册到 resolver 包中
func init() {
resolver.Register(NewBuilder())
}
...
func (*nsResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {
ctx, cancel := context.WithCancel(context.Background())
r := &nsResolver{
target: target,
cc: cc,
ctx: ctx,
cancel: cancel,
}
// 启动协程,轮询并更新指定 CalleeService 服务的实例变化
go r.watcher()
return r, nil
}
...
ns/resolver.go
中给出了
nsResolver
实现,我们再看下
nsResolver.watch()
方法:
package ns
...
// watcher 轮询并更新指定 CalleeService 服务的实例变化
func (r *nsResolver) watcher() {
r.updateCC()
ticker := time.NewTicker(syncNSInterval)
for {
select {
// 当* nsResolver Close 时退出
case <-r.ctx.Done():
ticker.Stop()
return
case <-ticker.C:
// 调用* nsResolver.updagteCC() 方法,更新实例地址
r.updateCC()
}
}
}
// updateCC 更新 resolver.Resolver.ClientConn 配置
func (r *nsResolver) updateCC() {
// 从 NS 服务获取指定 target 的实例列表
instances, err := r.getInstances(r.target)
// 如果获取实例列表失败,或者实例列表为空,则不更新 resolver 中实例列表
if err != nil || len(instances.CalleeIns) == 0 {
logz.Warn("[mis] error retrieving instances from Mis", logz.Any("target", r.target), logz.Error(err))
return
}
...
// 组装实例列表 []resolver.Address
// resolver.Address 结构体表示 grpc server 端实例地址
var newAddrs []resolver.Address
for k := range instances.calleeIns {
newAddrs = append(newAddrs, resolver.Address{
Addr: instances.CalleeIns[k],
})
}
...
// 更新实例列表
// grpc 底层 LB 组件对每个服务端实例创建一个 subConnection。并根据设定的 LB 策略,选择合适的 subConnection 处理某次 RPC 请求。
// 此处代码比较复杂,后续在 LB 相关原理文章中再做概述。
r.cc.UpdateState(resolver.State{Addresses: newAddrs})
}
...
07
总结
-
客户端启动时,引入自定义的 resolver 包(比如本例中我们自定义的 ns
包) -
引入 ns
包,在init()
阶段,构造自定义的 resolveBuilder,并将其注册到 grpc 内部的 resolveBuilder 表中(其实是一个全局 map,key 为协议名,比如ns
;value 为构造的 resolveBuilder,比如nsResolverBuilder
)。 -
客户端启动时通过自定义 Dail()
方法构造 grpc.ClientConn 单例 -
grpc.DialContext()
方法内部解析 URI,分析协议类型,并从 resolveBuilder 表中查找协议对应的 resolverBuilder。比如本例中我们定义的 URI 协议类型为ns
,对应的 resolverBuilder 为nsResolverBuilder
-
找到指定的 resolveBuilder 后,调用 resolveBuilder 的 Build()
方法,构建自定义 resolver,同时开启协程,通过此 resolver 更新被调服务实例列表。 -
Dial()
方法接收主调服务名和被调服务名,并根据自定义的协议名,基于这两个参数构造服务的 URI -
Dial()
方法内部使用构造的 URI,调用grpc.DialContext()
方法对指定服务进行拨号 -
grpc 底层 LB 库对每个实例均创建一个 subConnection,最终根据相应的 LB 策略,选择合适的 subConnection 处理某次 RPC 请求。
聪明又努力的 Gophers,让我知道你“在看”
以上是关于小米出品——gRPC Name Resolver 原理及实践的主要内容,如果未能解决你的问题,请参考以下文章
小米最新估值或超800亿美元; 抖音首次回应微博封杀;滴滴获批100亿元ABS额度;NGINX 宣布支持 gRPC...