Grpc原理
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Grpc原理相关的知识,希望对你有一定的参考价值。
参考技术A rpc调用原理框架如图:-支持多语言的rpc框架,例如Google的grpc,facebook thrift, 百度的brpc
-支持特定语言的rpc框架, 例如新浪微博的Motan
-支持服务治理微服务化特性框架,其底层仍是rpc框架,例如 阿里的Dubbo
目前业内主要使用基于多语言的 RPC 框架来构建微服务,是一种比较好的技术选择,例如netflix ,API服务编排层和后端微服务之间采用微服务rpc进行通信
-支持C java js
-git地址 https://github.com/grpc/grpc-java
-原理图:
gRPC 的线程模型遵循 Netty 的线程分工原则,即:协议层消息的接收和编解码由 Netty 的 I/O(NioEventLoop) 线程负责;后续应用层的处理由应用线程负责,防止由于应用处理耗时而阻塞 Netty 的 I/O 线程
不过正是因为有了分工原则,grpc 之间会做频繁的线程切换,如果在一次grpc调用过程中,做了多次I/O线程到应用线程之间的切换,会导致性能的下降,这也是为什么grpc在一些私有协议支持不太友好的原因
缺点
改进:
优化后BIO线程模型采用了线程池的做法但是后端的应用处理线程仍然采用同步阻塞的模型,阻塞的时间取决对方I/O处理的速度和网络I/O传输的速度
grpc的线程模型主要包含服务端线程模型,客户端线程模型
2.1.1 I/O 通信线程模型
gRPC的做法是服务监听线程和I/O线程分离Reactor多个线程模型 其工作原理如下:
2.1.2 服务调度线程模型
gRPC 客户端线程模型工作原理如下图所示(同步阻塞调用为例)
相比于服务端,客户端的线程模型简单一些,它的工作原理如下:
grpc 线程模型
小米出品——gRPC Name Resolver 原理及实践
本文字数:5380 字
精读时间:12 分钟
也可在 6 分钟内完成速读
01
前言
02
实现自定义 Name Resolver
resolver.go
、
resolver_build.go
、
dail.go
。
ns # 自定义 resolver 包名
├── dial.go # 封装了 gRPC 包的 grpc.DialContext() 方法,严格来说 dail.go 不应该放在 ns 包下,本例中这么做只是为简化包布局,方便读者理解
├── resolver.go # 实现了 gRPC resolver 包 Resolver 接口的 nsResolver
└── resolver_builder.go # 实现了 gRPC resolver 包 ResolverBuilder 接口的 nsResolverBuilder
03
定义 nsResolver
resolver.go
:
package ns
import (
"context"
"encoding/json"
"fmt"
"strings"
"time"
"mypkg/internal/logz" // 私有日志包,基于 uber 开源的 zap 实现
sdk "mypkg/internal/soa-sdk" // 私有 ns sdk 包,封装了内部 soa 平台进行服务发现的 sdk
_ "google.golang.org/grpc"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
)
const (
// syncNSInterval 定义了从 NS 服务同步实例列表的周期
syncNSInterval = 1 * time.Second
)
// nsResolver 实现了 resolver.Resolver 接口
type nsResolver struct {
target resolver.Target
cc resolver.ClientConn
ctx context.Context
cancel context.CancelFunc
...
}
// watcher 轮询并更新指定 CalleeService 服务的实例变化
func (r *nsResolver) watcher() {
r.updateCC()
ticker := time.NewTicker(syncNSInterval)
for {
select {
// 当* nsResolver Close 时退出监听
case <-r.ctx.Done():
ticker.Stop()
return
case <-ticker.C:
// 调用* nsResolver.updagteCC() 方法,更新实例地址
r.updateCC()
}
}
}
// updateCC 更新 resolver.Resolver.ClientConn 配置
func (r *nsResolver) updateCC() {
// 从 NS 服务获取指定 target 的实例列表
instances, err := r.getInstances(r.target)
// 如果获取实例列表失败,或者实例列表为空,则不更新 resolver 中实例列表
if err != nil || len(instances.CalleeIns) == 0 {
logz.Warn("[mis] error retrieving instances from Mis", logz.Any("target", r.target), logz.Error(err))
return
}
...
// 组装实例列表 []resolver.Address
// resolver.Address 结构体表示 grpc server 端实例地址
var newAddrs []resolver.Address
for k := range instances.CalleeIns {
newAddrs = append(newAddrs, instances.CalleeIns)
}
...
// 更新实例列表
// grpc 底层 LB 组件对每个服务端实例创建一个 subConnection。并根据设定的 LB 策略,选择合适的 subConnection 处理某次 RPC 请求。
// 此处代码比较复杂,后续在 LB 相关原理文章中再做概述
r.cc.UpdateState(resolver.State{Addresses: newAddrs})
}
// ResolveNow 实现了 resolver.Resolver.ResolveNow 方法
func (*nsResolver) ResolveNow(o resolver.ResolveNowOption) {}
// Close 实现了 resolver.Resolver.Close 方法
func (r *nsResolver) Close() {
r.cancel()
}
// instances 包含调用方服务名、被调方服务名、被调方实例列表等数据
type instances struct {
callerService string
calleeService string
calleeIns []string
}
// getInstances 获取指定服务所有可用的实例列表
func (r *nsResolver) getInstances(target resolver.Target) (s *instances, e error) {
auths := strings.Split(target.Authority, "@")
// auths[0] 为 callerService 名,target.Endpoint 为 calleeService 名
// 通过自定义 sdk 从内部 NameServer 查询指定 calleeService 对应的实例列表
ins, e := sdk.GetInstances(auths[0], target.Endpoint)
if e != nil {
return nil, e
}
return &instances{
callerService: auths[0],
calleeService: target.Endpoint,
calleeIns: ins.Instances,
}, nil
}
04
定义 nsResolverBuilder
ns/resolver_builder.go
构建 nsResolver 时,我们参考
google.golang.org/grpc/resolver/dns/dns_resolver.go
源码,采用 Builder 设计模式:
package ns
import (
"context"
"fmt"
"google.golang.org/grpc/resolver"
)
// init 将定义好的 NS Builder 注册到 resolver 包中
func init() {
resolver.Register(NewBuilder())
}
// NewBuilder 构造 nsResolverBuilder
func NewBuilder() resolver.Builder {
return &nsResolverBuilder{}
}
// nsResolverBuilder 实现了 resolver.Builder 接口,用来构造定义好的 Resolver Bulder
type nsResolverBuilder struct{}
// URI 返回某个服务的统一资源描述符(URI),这个 URI 可以从 nsResolver 中查询实例列表
// URI 设计时可以遵循 RFC-3986(https://tools.ietf.org/html/rfc3986) 规范,
// 比如本例中 ns 格式为:ns://callerService:@calleeService
// 其中 ns 为协议名,callerService 为订阅方服务名(即主调方服务名),calleeService 为发布方服务名(即被调方服务名)
func URI(callerService, calleeService string) string {
return fmt.Sprintf("ns://%s:@%s", callerService, calleeService)
}
// Build 实现了 resolver.Builder.Build 方法
func (*nsResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {
ctx, cancel := context.WithCancel(context.Background())
r := &nsResolver{
target: target,
cc: cc,
ctx: ctx,
cancel: cancel,
}
// 启动协程,响应指定 Name 服务实例变化
go r.watcher()
return r, nil
}
// Scheme 实现了 resolver.Builder.Scheme 方法
// Scheme 方法定义了 ns resolver 的协议名
func (*nsResolverBuilder) Scheme() string {
return "ns"
}
05
封装 gRPC.Dial() 方法
nsResolver
nsResolverBuilder
后,我们还需要对
grpc.Dial()
方法进行封装,方便业务方适用。封装后
dial.go
代码如下所示(
严格来说
dial.go
不应该放在
ns
包中,本例中这么做只是为简化包布局,方便读者理解
)
:
package ns
// Dial 封装 `grpc.Dial()` 方法以供业务方代码初始化 *grpc.ClientConn。
// 业务方可使用此 Dial 方法基于主调方服务名、被调方服务名等参数构造 *grpc.ClientConn 实例,
// 随后可在业务代码中使用 *grpc.ClientConn 实例构造桩代码中生成的 grpcServiceClient 并发起 RPC 调用。
func Dial(callerService, calleeService string, dialOpts ...grpc.DialOption) (*grpc.ClientConn, error) {
// 根据 callerService 和 calleeService 构造对应的 URI
URI := ns.URI(callerService, calleeService)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 设置拨号配置
opts := []grpc.DialOption{
grpc.WithBlock(),
grpc.WithInsecure(),
}
dialOpts = append(dialOpts, dialOpts...)
// 调用 grpc.DialContext() 方法拨号
conn, err := grpc.DialContext(
ctx,
URI,
opts...,
)
if err != nil {
logz.Warn("did not connect", logz.Any("target", URI), logz.E(err))
return nil, err
}
return conn, err
}
06
gRPC resolver 原理
ns
中定义了两个 go 文件,
resolver.go
和
resolver_builder.go
。
-
前者是整个功能最核心的代码,通过自定义 nsResolver
将服务名解析成对应实例。 -
后者是采用 Builder 模式在包初始化时创建并注册构造 nsResover
的nsResolverBuilder
实例。当客户端通过Dial
方法对指定服务进行拨号时,grpc resolver 查找注册的 Builder 实例调用其Build()
方法构建自定义nsResolver
。
demo.pb
文件:
syntax = "proto3";
package demo;
service DemoService {
rpc SayHi(HiRequest) returns (HiResponse);
}
message HiRequest {
string name = 1;
}
message HiResponse {
string message = 1;
}
demo.pb.go
可能如下:
package demo
...
type DemoServiceClient interface {
SayHiOK(ctx context.Context, in *HiRequest, opts ...grpc.CallOption) (*HiResponse, error)
}
type demoServiceClient struct {
cc *grpc.ClientConn
}
// NewDemoServiceClient 业务代码中此方法来构造 *demoServiceClient 实例
func NewDemoServiceClient(cc *grpc.ClientConn) DemoServiceClient {
return &demoServiceClient{cc}
}
func (c *demoServiceClient) SayHiOK(ctx context.Context, in *HiRequest, opts ...grpc.CallOption) (*HiResponse, error) {
out := new(HiResponse)
err := c.cc.Invoke(ctx, "/proto.DemoService/SayHiOK", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
...
*grpc.ClientConn
再发起 RPC 调用代码如下:
import "mypkg/internal/ns"
...
// 使用上节中封装的 ns.Dial 方法构造 *grpc.ClientConn
conn, _ := ns.Dial("my-caller-service", "my-callee-service")
// 构造 *demoServiceClient
cli := demo.NewDemoServiceClient(conn)
// 使用 *demoServiceClient 发起 RPC 调用
res, _ := cli.SayHiOK(ctx, &proto.HiRequest{Name: "world"})
...
import "mypkg/internal/ns"
包后,在
ns/resolver_builder.go
的 init 阶段会通过
Register()
方法将
nsResolverBuilder
注册到 grpc 内部的一个全局 map 中:
// m 定义为一个全局 map,用于存放 [resolver 协议名 -> resolverBuilder] 键值对
var m = make(map[string]Builder)
// Register 方法将指定 [resolver 协议名 -> resolverBuilder] 键值对存入 map
func Register(b Builder) {
m[b.Scheme()] = b
}
// Get 方法根据传入的 resolver 协议名返回对应的 resolverBuilder
func Get(scheme string) Builder {
if b, ok := m[scheme]; ok {
return b
}
return nil
}
ns.Dial()
方法使用 callerService 和 calleeService 构造服务 URI,并使用此 URI 作为参数调用
grpc.DialContext()
方法,来构造
*grpc.ClientConn
实例。
grpc.DialContext()
方法接收三个参数:ctx、target、opts,
ns://my-caller-service:@my-callee-service
,其中
ns
为协议名。grpc 可通过协议名查表来获取对应的 resolverBuilder。opts:是一个变长参数,表示拨号配置选项。
grpc.DialContext()
内部逻辑比较复杂,我们挑重点讲:
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
// 构造 ClientConn 实例
cc := &ClientConn{
target: target,
csMgr: &connectivityStateManager{},
conns: make(map[*addrConn]struct{}),
dopts: defaultDialOptions(),
blockingpicker: newPickerWrapper(),
czData: new(channelzData),
firstResolveEvent: grpcsync.NewEvent(),
}
cc.retryThrottler.Store((*retryThrottler)(nil))
cc.ctx, cc.cancel = context.WithCancel(context.Background())
for _, opt := range opts {
opt.apply(&cc.dopts)
}
...
// 如果用户指定了 timeout 超时配置,那么初始化一个带超时的 ctx
// 如果 defer 阶段已超时,则抛出 j 错误
if cc.dopts.timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
defer cancel()
}
defer func() {
select {
case <-ctx.Done():
conn, err = nil, ctx.Err()
default:
}
}()
...
// Name Resolver 核心逻辑,初始化 resolverBuilder,代码中首先会判断下用户是否指定 resolverBuilder
// - 如果有指定 resolverBuilder,则直接使用此 resolverBuilder。
// - 如果用户没有指定 resolverBuilder,那么 grpc 做如下操作:
// - 通过 parseTarget 方法解析用户传入的 target,本例中即 `ns://my-caller-service:@my-callee-service`,获取 Scheme(协议名)、authority(包含 callerService、calleeService)。
// - 查询指定协议对应的 resolverBuilder。
if cc.dopts.resolverBuilder == nil {
// 解析用户传入的 target
cc.parsedTarget = parseTarget(cc.target)
// 通过协议名查表获取对应的 resolverBuilder
cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
// 如果表中没查到对应的 resolverBuilder,则使用默认协议查询对应的 resolverBuilder
// 默认协议为 `passthrough`,它会从用户解析的 target 中直接读取 endpoint 地址
if cc.dopts.resolverBuilder == nil {
cc.parsedTarget = resolver.Target{
Scheme: resolver.GetDefaultScheme(),
Endpoint: target,
}
cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
}
} else {
cc.parsedTarget = resolver.Target{Endpoint: target}
}
...
// 使用上面初始化的 resolverBuilder 构建 resolver
// 初始化 resolverWrapper
rWrapper, err := newCCResolverWrapper(cc)
if err != nil {
return nil, fmt.Errorf("failed to build resolver: %v", err)
}
cc.mu.Lock()
cc.resolverWrapper = rWrapper
cc.mu.Unlock()
// 如果客户端配置了 WithBlock option,则会轮询 ClientConn 状态,如果 ClientConn 就绪,则返回 ClientConn。
// 如果直到 ctx 超时或被 Cancel ClientConn 依然未就绪,则抛出 ctx.Err() 错误。
if cc.dopts.block {
for {
s := cc.GetState()
// 1. 如果 ClientConn 状态为 Ready 则返回此 ClientConn
// 2. 如果 ClientConn 状态并非 Ready,且用户配置了 FailOnNonTempDialError,当前 ClientConn 状态为 TransientFailure,且 lbPicker 尝试和服务端实例建立连接时产生错误。根据错误性质做如下处理:
// 2.1. 如果此错误是非临时性的错误,则抛出此错误
// 2.2. 如果此错误是临时性的错误,则继续轮询 ClientConn 状态,直至 ctx 超时或被外部 Cancel
if s == connectivity.Ready {
break
} else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {
if err = cc.blockingpicker.connectionError(); err != nil {
terr, ok := err.(interface {
Temporary() bool
})
if ok && !terr.Temporary() {
return nil, err
}
}
}
if !cc.WaitForStateChange(ctx, s) {
return nil, ctx.Err()
}
}
}
return cc, nil
}
newCCResolverWrapper()
方法内部实现:
func newCCResolverWrapper(cc *ClientConn) (*ccResolverWrapper, error) {
...
ccr := &ccResolverWrapper{
cc: cc,
addrCh: make(chan []resolver.Address, 1),
scCh: make(chan string, 1),
}
var err error
// rb.Build() 调用指定 resolveBuilder 的 Build 方法,本例中会执行我们定义的 nsResolverBuilder.Builder() 方法
ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, resolver.BuildOption{DisableServiceConfig: cc.dopts.disableServiceConfig})
if err != nil {
return nil, err
}
return ccr, nil
}
ns/resolver_builder.go
中已经给出了
nsResolverBuilder
实现,我们再看下
nsResolverBuilder.Builder()
方法内部逻辑:
package ns
// init 将定义好的 NS Builder 注册到 resolver 包中
func init() {
resolver.Register(NewBuilder())
}
...
func (*nsResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {
ctx, cancel := context.WithCancel(context.Background())
r := &nsResolver{
target: target,
cc: cc,
ctx: ctx,
cancel: cancel,
}
// 启动协程,轮询并更新指定 CalleeService 服务的实例变化
go r.watcher()
return r, nil
}
...
ns/resolver.go
中给出了
nsResolver
实现,我们再看下
nsResolver.watch()
方法:
package ns
...
// watcher 轮询并更新指定 CalleeService 服务的实例变化
func (r *nsResolver) watcher() {
r.updateCC()
ticker := time.NewTicker(syncNSInterval)
for {
select {
// 当* nsResolver Close 时退出
case <-r.ctx.Done():
ticker.Stop()
return
case <-ticker.C:
// 调用* nsResolver.updagteCC() 方法,更新实例地址
r.updateCC()
}
}
}
// updateCC 更新 resolver.Resolver.ClientConn 配置
func (r *nsResolver) updateCC() {
// 从 NS 服务获取指定 target 的实例列表
instances, err := r.getInstances(r.target)
// 如果获取实例列表失败,或者实例列表为空,则不更新 resolver 中实例列表
if err != nil || len(instances.CalleeIns) == 0 {
logz.Warn("[mis] error retrieving instances from Mis", logz.Any("target", r.target), logz.Error(err))
return
}
...
// 组装实例列表 []resolver.Address
// resolver.Address 结构体表示 grpc server 端实例地址
var newAddrs []resolver.Address
for k := range instances.calleeIns {
newAddrs = append(newAddrs, resolver.Address{
Addr: instances.CalleeIns[k],
})
}
...
// 更新实例列表
// grpc 底层 LB 组件对每个服务端实例创建一个 subConnection。并根据设定的 LB 策略,选择合适的 subConnection 处理某次 RPC 请求。
// 此处代码比较复杂,后续在 LB 相关原理文章中再做概述。
r.cc.UpdateState(resolver.State{Addresses: newAddrs})
}
...
07
总结
-
客户端启动时,引入自定义的 resolver 包(比如本例中我们自定义的 ns
包) -
引入 ns
包,在init()
阶段,构造自定义的 resolveBuilder,并将其注册到 grpc 内部的 resolveBuilder 表中(其实是一个全局 map,key 为协议名,比如ns
;value 为构造的 resolveBuilder,比如nsResolverBuilder
)。 -
客户端启动时通过自定义 Dail()
方法构造 grpc.ClientConn 单例 -
grpc.DialContext()
方法内部解析 URI,分析协议类型,并从 resolveBuilder 表中查找协议对应的 resolverBuilder。比如本例中我们定义的 URI 协议类型为ns
,对应的 resolverBuilder 为nsResolverBuilder
-
找到指定的 resolveBuilder 后,调用 resolveBuilder 的 Build()
方法,构建自定义 resolver,同时开启协程,通过此 resolver 更新被调服务实例列表。 -
Dial()
方法接收主调服务名和被调服务名,并根据自定义的协议名,基于这两个参数构造服务的 URI -
Dial()
方法内部使用构造的 URI,调用grpc.DialContext()
方法对指定服务进行拨号 -
grpc 底层 LB 库对每个实例均创建一个 subConnection,最终根据相应的 LB 策略,选择合适的 subConnection 处理某次 RPC 请求。
聪明又努力的 Gophers,让我知道你“在看”
以上是关于Grpc原理的主要内容,如果未能解决你的问题,请参考以下文章