grpc client 源码分析

Posted golang算法架构leetcode技术php

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了grpc client 源码分析相关的知识,希望对你有一定的参考价值。

grpc client 代码非常简洁,分三步

1,获取连接

2,初始化客户端

3,发送请求

conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithBlock())defer conn.Close()
c := pb.NewGreeterClient(conn)
r, err := c.SayHello(ctx, &pb.HelloRequest{Name: name})

首先看下发送请求

type GreeterClient interface { // Sends a greeting SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error)}
func (c *greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) { out := new(HelloReply) err := c.cc.Invoke(ctx, "/helloworld.Greeter/SayHello", in, out, opts...) if err != nil { return nil, err } return out, nil}

调用了cc的Invoke方法

type greeterClient struct { cc grpc.ClientConnInterface}

ClientConnInterface的定义如下

type ClientConnInterface interface { // Invoke performs a unary RPC and returns after the response is received // into reply. Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...CallOption) error // NewStream begins a streaming RPC. NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error)}

接口包含了两个方法Invoke和NewStream定义在clientconn.go中


接着看下client初始化的代码

func NewGreeterClient(cc grpc.ClientConnInterface) GreeterClient { return &greeterClient{cc}}

仅仅把connet interface传给了client


最后看下获取连接的实现

func Dial(target string, opts ...DialOption) (*ClientConn, error) { return DialContext(context.Background(), target, opts...)}

clientconn.go的Dial方法返回了ClientConn指针

func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) { cc := &ClientConn{ target: target, csMgr: &connectivityStateManager{}, conns: make(map[*addrConn]struct{}), dopts: defaultDialOptions(), blockingpicker: newPickerWrapper(), czData: new(channelzData), firstResolveEvent: grpcsync.NewEvent(), } ....cc.parsedTarget = grpcutil.ParseTarget(cc.target, cc.dopts.copts.Dialer != nil)resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme) .... cc.balancerBuildOpts = balancer.BuildOptions{ DialCreds: credsClone, CredsBundle: cc.dopts.copts.CredsBundle, Dialer: cc.dopts.copts.Dialer, CustomUserAgent: cc.dopts.copts.UserAgent, ChannelzParentID: cc.channelzID, Target: cc.parsedTarget, } .....rWrapper, err := newCCResolverWrapper(cc, resolverBuilder) }

其中ClientConn的结构体定义如下

type ClientConn struct { ctx context.Context cancel context.CancelFunc
target string parsedTarget resolver.Target authority string dopts dialOptions csMgr *connectivityStateManager
balancerBuildOpts balancer.BuildOptions blockingpicker *pickerWrapper
safeConfigSelector iresolver.SafeConfigSelector
mu sync.RWMutex resolverWrapper *ccResolverWrapper sc *ServiceConfig conns map[*addrConn]struct{} // Keepalive parameter can be updated if a GoAway is received. mkp keepalive.ClientParameters curBalancerName string balancerWrapper *ccBalancerWrapper retryThrottler atomic.Value
firstResolveEvent *grpcsync.Event
channelzID int64 // channelz unique identification number czData *channelzData
lceMu sync.Mutex // protects lastConnectionError lastConnectionError error}

可以看出Dial仅仅做了connection的初始化


call.go里定义了Invoke方法

func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error { // allow interceptor to see all applicable call options, which means those // configured as defaults from dial option as well as per-call options opts = combine(cc.dopts.callOptions, opts)
if cc.dopts.unaryInt != nil { return cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...) } return invoke(ctx, method, args, reply, cc, opts...)}
func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error { cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...) if err != nil { return err } if err := cs.SendMsg(req); err != nil { return err } return cs.RecvMsg(reply)}

里面分了三步,建立连接,发送请求,获取结果

newClientStream 函数定义在stream.go文件里

func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) { rpcConfig, err := cc.safeConfigSelector.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: method}) callHdr := &transport.CallHdr{ Host: cc.authority, Method: method, ContentSubtype: c.contentSubtype, } cs := &clientStream{ callHdr: callHdr, ctx: ctx, methodConfig: &mc, opts: opts, callInfo: c, cc: cc, desc: desc, codec: c.codec, cp: cp, comp: comp, cancel: cancel, beginTime: beginTime, firstAttempt: true, onCommit: onCommit, } if err := cs.newAttemptLocked(sh, trInfo); err != nil { cs.finish(err) return nil, err }}
func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo *traceInfo) (retErr error) { newAttempt := &csAttempt{ cs: cs, dc: cs.cc.dopts.dc, statsHandler: sh, trInfo: trInfo, } t, done, err := cs.cc.getTransport(ctx, cs.callInfo.failFast, cs.callHdr.Method) }

getTransport 定义在clientconn.go中

func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) { t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{ Ctx: ctx, FullMethodName: method, }) if err != nil { return nil, nil, toRPCErr(err) } return t, done, nil}

pick函数定义在picker_warper.go

func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (transport.ClientTransport, func(balancer.DoneInfo), error) {for{pickResult, err := p.Pick(info)if t, ok := acw.getAddrConn().getReadyTransport(); ok {}}}

在stream.go文件里定义了ClientStream的接口

type ClientStream interface {// Header returns the header metadata received from the server if there // is any. It blocks if the metadata is not ready to read. Header() (metadata.MD, error) // Trailer returns the trailer metadata from the server, if there is any. // It must only be called after stream.CloseAndRecv has returned, or // stream.Recv has returned a non-nil error (including io.EOF). Trailer() metadata.MD // CloseSend closes the send direction of the stream. It closes the stream // when non-nil error is met. It is also not safe to call CloseSend // concurrently with SendMsg. CloseSend() error // Context returns the context for this stream. // // It should not be called until after Header or RecvMsg has returned. Once // called, subsequent client-side retries are disabled. Context() context.Context // SendMsg is generally called by generated code. On error, SendMsg aborts // the stream. If the error was generated by the client, the status is // returned directly; otherwise, io.EOF is returned and the status of // the stream may be discovered using RecvMsg. // // SendMsg blocks until: // - There is sufficient flow control to schedule m with the transport, or // - The stream is done, or // - The stream breaks. // // SendMsg does not wait until the message is received by the server. An // untimely stream closure may result in lost messages. To ensure delivery, // users should ensure the RPC completed successfully using RecvMsg. // // It is safe to have a goroutine calling SendMsg and another goroutine // calling RecvMsg on the same stream at the same time, but it is not safe // to call SendMsg on the same stream in different goroutines. It is also // not safe to call CloseSend concurrently with SendMsg. SendMsg(m interface{}) error // RecvMsg blocks until it receives a message into m or the stream is // done. It returns io.EOF when the stream completes successfully. On // any other error, the stream is aborted and the error contains the RPC // status. // // It is safe to have a goroutine calling SendMsg and another goroutine // calling RecvMsg on the same stream at the same time, but it is not // safe to call RecvMsg on the same stream in different goroutines. RecvMsg(m interface{}) error}

clientstream实现了上述接口

type clientStream struct { callHdr *transport.CallHdr opts []CallOption callInfo *callInfo cc *ClientConn desc *StreamDesc
codec baseCodec cp Compressor comp encoding.Compressor
cancel context.CancelFunc // cancels all attempts
sentLast bool // sent an end stream beginTime time.Time
methodConfig *MethodConfig
ctx context.Context // the application's context, wrapped by stats/tracing
retryThrottler *retryThrottler // The throttler active when the RPC began.
binlog *binarylog.MethodLogger // Binary logger, can be nil. // serverHeaderBinlogged is a boolean for whether server header has been // logged. Server header will be logged when the first time one of those // happens: stream.Header(), stream.Recv(). // // It's only read and used by Recv() and Header(), so it doesn't need to be // synchronized. serverHeaderBinlogged bool
mu sync.Mutex firstAttempt bool // if true, transparent retry is valid numRetries int // exclusive of transparent retry attempt(s) numRetriesSincePushback int // retries since pushback; to reset backoff finished bool // TODO: replace with atomic cmpxchg or sync.Once? // attempt is the active client stream attempt. // The only place where it is written is the newAttemptLocked method and this method never writes nil. // So, attempt can be nil only inside newClientStream function when clientStream is first created. // One of the first things done after clientStream's creation, is to call newAttemptLocked which either // assigns a non nil value to the attempt or returns an error. If an error is returned from newAttemptLocked, // then newClientStream calls finish on the clientStream and returns. So, finish method is the only // place where we need to check if the attempt is nil. attempt *csAttempt // TODO(hedging): hedging will have multiple attempts simultaneously. committed bool // active attempt committed for retry? onCommit func() buffer []func(a *csAttempt) error // operations to replay on retry bufferSize int // current size of buffer}

实现了SendMsg和RecvMsg个方法

func (cs *clientStream) SendMsg(m interface{}) (err error) {hdr, payload, data, err := prepareMsg(m, cs.codec, cs.cp, cs.comp) op := func(a *csAttempt) error { err := a.sendMsg(m, hdr, payload, data) // nil out the message and uncomp when replaying; they are only needed for // stats which is disabled for subsequent attempts. m, data = nil, nil return err }}

func prepareMsg(m interface{}, codec baseCodec, cp Compressor, comp encoding.Compressor) (hdr, payload, data []byte, err error) { data, err = encode(codec, m) compData, err := compress(data, cp, comp)  hdr, payload = msgHeader(data, compData)  }

实现了数据的编码压缩

紧接着发送数据

func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error { if err := a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams}); err != nil { } if a.statsHandler != nil { a.statsHandler.HandleRPC(cs.ctx, outPayload(true, m, data, payld, time.Now())) } }

最后是接受消息

func (cs *clientStream) RecvMsg(m interface{}) error { err := cs.withRetry(func(a *csAttempt) error { return a.recvMsg(m, recvInfo) }, cs.commitAttemptLocked)}
func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) {err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decomp)a.statsHandler.HandleRPC(cs.ctx, &stats.InPayload{ Client: true, RecvTime: time.Now(), Payload: m, // TODO truncate large payload. Data: payInfo.uncompressedBytes, WireLength: payInfo.wireLength + headerLen, Length: len(payInfo.uncompressedBytes), })err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, nil, a.decomp) }
func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interface{}, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) error { d, err := recvAndDecompress(p, s, dc, maxReceiveMessageSize, payInfo, compressor) if err != nil { return err } if err := c.Unmarshal(d, m); err != nil { return status.Errorf(codes.Internal, "grpc: failed to unmarshal the received message %v", err) } if payInfo != nil { payInfo.uncompressedBytes = d } return nil}


以上是关于grpc client 源码分析的主要内容,如果未能解决你的问题,请参考以下文章

gRPC 服务端和客户端源码分析(golang)

gRPC-go源码:ClientConn

gRPC-go源码:ClientConn

grpc-go客户端源码分析

grpc-go客户端源码分析

GRPC框架源码分析