grpc-go客户端源码分析

Posted inet_ygssoftware

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了grpc-go客户端源码分析相关的知识,希望对你有一定的参考价值。

grpc-go客户端源码分析

代码讲解基于v1.37.0版本。

和grpc-go服务端源码分析一样,我们先看一段示例代码,

const (
	address     = "localhost:50051"
	defaultName = "world"
)

func main() {
	// Set up a connection to the server.
	conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithBlock())
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()
	c := pb.NewGreeterClient(conn)

	// Contact the server and print out its response.
	name := defaultName
	if len(os.Args) > 1 {
		name = os.Args[1]
	}
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
	r, err := c.SayHello(ctx, &pb.HelloRequest{Name: name})
	if err != nil {
		log.Fatalf("could not greet: %v", err)
	}
	log.Printf("Greeting: %s", r.GetMessage())
}

先调用grpc.Dial生成一个grpc.ClientConn对象,具体的初始化操作在grpc.DialContext方法中。

DialContext首先初始化空对象ClientConn,然后判断opts …DialOption数据是否存在,如果存在就执行传入的函数并设置特定属性。

// Dial creates a client connection to the given target.
func Dial(target string, opts ...DialOption) (*ClientConn, error) {
	return DialContext(context.Background(), target, opts...)
}

代码的关键点在于创建ClientConn对象,对应结构体包括的字段。

// ClientConn represents a virtual connection to a conceptual endpoint, to
// perform RPCs. 
// 
// ClientConn是用于RPC通信的虚拟连接
// A ClientConn is free to have zero or more actual connections to the endpoint
// based on configuration, load, etc. It is also free to determine which actual
// endpoints to use and may change it every RPC, permitting client-side load
// balancing.
//
// ClientConn可以选择连接终端的数量同时选择负载均衡逻辑
//
// A ClientConn encapsulates a range of functionality including name
// resolution, TCP connection establishment (with retries and backoff) and TLS
// handshakes. It also handles errors on established connections by
// re-resolving the name and reconnecting.
// ClientConn封装了一系列的功能,包括名称解析,TCP连接建立(包括重试和退避策略),TLS,重新名字解析和重联机制。

type ClientConn struct {
	ctx    context.Context
	cancel context.CancelFunc

	target       string
	parsedTarget resolver.Target // 负载均衡选择
	authority    string
	dopts        dialOptions // 初始化可设置选项,在每一次请求会带上,看call.go中的combine方法
	csMgr        *connectivityStateManager // 连接状态维护 

	balancerBuildOpts balancer.BuildOptions // 忽略 
	blockingpicker    *pickerWrapper // 负载均衡设置 

	safeConfigSelector iresolver.SafeConfigSelector // 忽略 

	mu              sync.RWMutex
	resolverWrapper *ccResolverWrapper // 实现了resolver.ClientConn,位于./resolver/resolver.go中,ClientConn的上层包装器(疑惑?)
	sc              *ServiceConfig
	conns           map[*addrConn]struct{} // 存放连接的地方
	// Keepalive parameter can be updated if a GoAway is received.
	mkp             keepalive.ClientParameters
	curBalancerName string
	balancerWrapper *ccBalancerWrapper // 负载均衡器上的包装器(疑惑?)
	retryThrottler  atomic.Value

	firstResolveEvent *grpcsync.Event

	channelzID int64 // channelz unique identification number
	czData     *channelzData

	lceMu               sync.Mutex // protects lastConnectionError
	lastConnectionError error
}

调用pb.NewGreeterClient(conn)返回当前PB的Client对象,具体的实现代码:

func NewGreeterClient(cc grpc.ClientConnInterface) GreeterClient {
	return &greeterClient{cc}
}

// ClientConnInterface defines the functions clients need to perform unary and
// streaming RPCs.  It is implemented by *ClientConn, and is only intended to
// be referenced by generated code.

// ClientConnInterface定义了执行RPC方法(包括unary和streaming)的对象需要实现的函数
// ClientConn实现了该interface{},只希望被自动生成的代码调用。

type ClientConnInterface interface {
	// Invoke performs a unary RPC and returns after the response is received
	// into reply.
	// Invoke执行unary类型的请求并返回数据
	Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...CallOption) error
	// NewStream begins a streaming RPC.
	// NewStream开启streaming RPC.
	NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error)
}

虽然ClientConn实现了ClientConnInterface,然而实现代码并没有放到一起。

ClientConn和ClientConnInterface定义在clientconn.go文件中。ClientConn实现Invoke方法是在call.go文件中。ClientConn实现NewStream方法是在stream.go文件中。

说说c.SayHello(ctx, &pb.HelloRequest{Name: name}),使用之前定义的GreeterClient来调用SayHello方法,具体代码实现如下,

func (c *greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) {
	out := new(HelloReply)
	// c是当前初始化的greetClient,cc是之前初始化好的ClientConn,Invoke表示使用unary方法,接下来就可以跳转到call.go文件中查看。
	err := c.cc.Invoke(ctx, "/helloworld.Greeter/SayHello", in, out, opts...)
	if err != nil {
		return nil, err
	}
	return out, nil
}

接下来看Invoke方法具体实现,

func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error {
        // 省略一些代码 
	return invoke(ctx, method, args, reply, cc, opts...)
}

func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
        // 创建ClientStream,newClientStream这个方法unary方法也会调用,使用第二个参数StreamDesc来区分。
	cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
	if err != nil {
		return err
	}
	// 发消息,cs是grpc.clientStream对象,调用clientStream的SendMsg方法 
	if err := cs.SendMsg(req); err != nil {
		return err
	}
	// 收消息,cs是grpc.clientStream对象,调用clientStream的RecvMsg方法 
	return cs.RecvMsg(reply)
}

对于newClientStream的调用,

func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
	var newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
		return newClientStreamWithParams(ctx, desc, cc, method, mc, onCommit, done, opts...)
	}
	return newStream(ctx, func() {})
}

// 初始化stream传入参数说明,
// desc *StreamDesc,决定调用unary还是stream
// cc *ClientConn,grpc连接对象
// opts ...CallOption,初始化对象传入的各种参数 
func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc serviceconfig.MethodConfig, onCommit, doneFunc func(), opts ...CallOption) (_ iresolver.ClientStream, err error) {
	// 省略部分代码
	cs := &clientStream{
		callHdr:      callHdr,
		ctx:          ctx,
		methodConfig: &mc,
		opts:         opts,
		callInfo:     c,
		cc:           cc,
		desc:         desc,
		codec:        c.codec,
		cp:           cp,
		comp:         comp,
		cancel:       cancel,
		beginTime:    beginTime,
		firstAttempt: true,
		onCommit:     onCommit,
	}
	op := func(a *csAttempt) error { return a.newStream() }
	// 使用forloop初始化stream,这个代码写的比较绕
	if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }); err != nil {
		cs.finish(err)
		return nil, err
	}
	return cs, nil
}

看看withRetry实现方式,

func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) error {
	cs.mu.Lock()
	for {
		if cs.committed {
			cs.mu.Unlock()
			return op(cs.attempt)
		}
		a := cs.attempt // 这里是指针 
		cs.mu.Unlock()
		err := op(a) // 将指针传入op函数中,op是`op := func(a *csAttempt) error { return a.newStream() }`
		cs.mu.Lock()
		if a != cs.attempt {
			// We started another attempt already.
			continue
		}
		if err == io.EOF {
			<-a.s.Done()
		}
		if err == nil || (err == io.EOF && a.s.Status().Code() == codes.OK) {
			onSuccess()
			cs.mu.Unlock()
			return err
		}
		if err := cs.retryLocked(err); err != nil {
			cs.mu.Unlock()
			return err
		}
	}
}

func (a *csAttempt) newStream() error {
	cs := a.cs
	cs.callHdr.PreviousAttempts = cs.numRetries
	s, err := a.t.NewStream(cs.ctx, cs.callHdr)
	if err != nil {
		if _, ok := err.(transport.PerformedIOError); ok {
			// Return without converting to an RPC error so retry code can
			// inspect.
			return err
		}
		return toRPCErr(err)
	}
	cs.attempt.s = s // 在这里设置stream,绕了一路 
	cs.attempt.p = &parser{r: s}
	return nil
}

通过上面代码分析,我们已经知道invoke方法中stream的初始化逻辑,接下来看看SendMsg和RecvMsg方法。

首先SendMsg和RecvMsg方法都属于ClientStream接口,该接口中还有其他的方法,主要用于http2通信。

// ClientStream defines the client-side behavior of a streaming RPC.
// ClientStream定义了rpc通信的客户端行为。
// 
// All errors returned from ClientStream methods are compatible with the
// status package.
// 所有ClientStream方法返回的错误都适用于status包中的定义。

type ClientStream interface {
	// Header returns the header metadata received from the server if there
	// is any. It blocks if the metadata is not ready to read.
	// Header会返回服务端的header元数据,如果元数据还未满足条件会一直阻塞。
	Header() (metadata.MD, error)
	// Trailer returns the trailer metadata from the server, if there is any.
	// It must only be called after stream.CloseAndRecv has returned, or
	// stream.Recv has returned a non-nil error (including io.EOF).
	// Trailer会返回trailer元数据,只会在stream.CloseAndRecv返回或者stream.Recv返回错误的时候才会被调用。
	Trailer() metadata.MD
	// CloseSend closes the send direction of the stream. It closes the stream
	// when non-nil error is met. It is also not safe to call CloseSend
	// concurrently with SendMsg.
	// CloseSend用于关闭发送方的流,如果遇到非空错误,也会关闭。CloseSend和SendMsg不是并发安全的。
	CloseSend() error
	// Context returns the context for this stream.
	//
	// It should not be called until after Header or RecvMsg has returned. Once
	// called, subsequent client-side retries are disabled.
	// Context 返回了stream的上下文,在Header或者RecvMsg返回之后不应该被调用。
	// 一旦被点用,后续的客户端重试都无效了。
	Context() context.Context
	// SendMsg is generally called by generated code. On error, SendMsg aborts
	// the stream. If the error was generated by the client, the status is
	// returned directly; otherwise, io.EOF is returned and the status of
	// the stream may be discovered using RecvMsg.
	// SendMsg通常被自动生成的代码调用。如果遇到错误,SendMsg就会停止stream。
	// 如果错误是因为客户端产生的,状态会立即返回,否则返回io.EOF,使用RecvMsg能够发现stream的状态。
	//
	// SendMsg blocks until:
	//   - There is sufficient flow control to schedule m with the transport, or
	//   - The stream is done, or
	//   - The stream breaks.
	// SendMsg会阻塞的三种场景,流被终止,流完成,存在足够流控制
	// SendMsg does not wait until the message is received by the server. An
	// untimely stream closure may result in lost messages. To ensure delivery,
	// users should ensure the RPC completed successfully using RecvMsg.
	// SendMsg不会阻塞直到服务器接收到完整的数据,过早的流关闭会导致消息丢失。
	// 为了保证接受率,用户应该使用RecvMsg保证RPC成功结束。
	
	// It is safe to have a goroutine calling SendMsg and another goroutine
	// calling RecvMsg on the same stream at the same time, but it is not safe
	// to call SendMsg on the same stream in different goroutines. It is also
	// not safe to call CloseSend concurrently with SendMsg.
	// 在不同的协程中并发调用SendMsg和RecvMsg是可以的,但是如果在不同的协程中同时调用SendMsg不是并发安全的。
	// CloseSend和SendMsg也不是并发安全的。
	SendMsg(m interface{}) error
	// RecvMsg blocks until it receives a message into m or the stream is
	// done. It returns io.EOF when the stream completes successfully. On
	// any other error, the stream is aborted and the error contains the RPC
	// status.
	// RecvMsg会一直阻塞直到接收到所有的数据或者stream停止了。如果流成功完成了,会返回io.EOF。
	// 对于其他错误,流会被终止,并且错误会带有RPC状态。
	// 
	// It is safe to have a goroutine calling SendMsg and another goroutine
	// calling RecvMsg on the same stream at the same time, but it is not
	// safe to call RecvMsg on the same stream in different goroutines.
	// SendMsg和RecvMsg调用是并发安全的,在不同的协程中调用RecvMsg不是并发安全。
	RecvMsg(m interface{}) error
}

分析一下invoke方法调用的SendMsg方法,

func (cs *clientStream) SendMsg(m interface{}) (err error) {
	// 省略部分代码 
	
	// 处理一下消息 
	hdr, payload, data, err := prepareMsg(m, cs.codec, cs.cp, cs.comp)
	if err != nil {
		return err
	}

	msgBytes := data // Store the pointer before setting to nil. For binary logging.
	op := func(a *csAttempt) error {
	    // 发送消息 
		err := a.sendMsg(m, hdr, payload, data)
		// nil out the message and uncomp when replaying; they are only needed for
		// stats which is disabled for subsequent attempts.
		m, data = nil, nil
		return err
	}
	// 采用重试的方式确保消息发送出去
	err = cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) })
    // 省略部分代码 
}

(a *csAttempt) sendMsg发送消息中的Write属于type ClientTransport interface{}中的方法,在internal/transport/transport.go文件中。

func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error {
	cs := a.cs
	if err := a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams}); err != nil {
        // ...
	}
	// ...
	return nil
}

// Write formats the data into HTTP2 data frame(s) and sends it out. The caller
// should proceed only if Write returns nil.
// Writ方法将数据弄成数据帧的方式,然后发送出去,关注代码最后一句controlBuf.put功能。
func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
	if opts.Last {
		// If it's the last message, update stream state.
		if !s.compareAndSwapState(streamActive, streamWriteDone) {
			return errStreamDone
		}
	} else if s.getState() != streamActive {
		return errStreamDone
	}
	df := &dataFrame{
		streamID:  s.id,
		endStream: opts.Last,
		h:         hdr,
		d:         data,
	}
	if hdr != nil || data != nil { // If it's not an empty data frame, check quota.
		if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
			return err
		}
	}
	return t.controlBuf.put(df)
}

// controlBuffer is a way to pass information to loopy.
// Information is passed as specific struct types called control frames.
// A control frame not only represents data, messages or headers to be sent out
// but can also be used to instruct loopy to update its internal state.
// It shouldn't be confused with an HTTP2 frame, although some of the control frames
// like dataFrame and headerFrame do go out on wire as HTTP2 frames.
// controlBuffer是将信息传递给loopy的一种方式,通过特殊的结构体形式传递的信息称为控制control frames。control frame不仅仅代表数据,消息,消息头,还可以通知loopy来更新内部状态。
// 注意不能和http2的帧搞混,尽管某些像dataFrame,headerFrame。
type controlBuffer struct {
	ch              chan struct{}
	done            <-chan struct{}
	mu              sync.Mutex
	consumerWaiting bool
	list            *itemList
	err             error

	// transportResponseFrames counts the number of queued items that represent
	// the response of an action initiated by the peer.  trfChan is created
	// when transportResponseFrames >= maxQueuedTransportResponseFrames and is
	// closed and nilled when transportResponseFrames drops below the
	// threshold.  Both fields are protected by mu.
	transportResponseFrames int
	trfChan                 atomic.Value // *chan struct{}
}

关于err = cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) })如何接收消息,这里详细说明一下,调用流程较长。

func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) error {
	for {
	    // retryLocked很重要,到底做什么呢?
		if err := cs.retryLocked(err); err != nil {
			cs.mu.Unlock()
			return err
		}
	}
}

func (cs *clientStream) retryLocked(lastErr error) error {
	for {
	    // 看newAttemptLocked
		if err := cs.newAttemptLocked(nil, nil); err != nil {
			return err
		}
	}
}

func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo *traceInfo) (retErr error) {
	newAttempt := &csAttempt{
		cs:           cs,
		dc:           cs.cc.dopts.dc,
		statsHandler: sh,
		trInfo:       trInfo,
	}
	// 每次getTransport获取使用的连接,cc是ClientConn对象,涉及负载均衡了。
	t, done, err := cs.cc.getTransport(ctx, cs.callInfo.failFast, cs.callHdr.Method)
	if err != nil {
		return err
	}
	if trInfo != nil {
		trInfo.firstLine.SetRemoteAddr(t.RemoteAddr())
	}
	newAttempt.t = t
	newAttempt.done = done
	cs.attempt = newAttempt
	return nil
}

func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
    // 关注pick方法 
	t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{
		Ctx:            ctx,
		FullMethodName: method,
	})
}

func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (transport.ClientTransport, func(balancer.DoneInfo), error) {
    // 选择满足条件的transport
		if t, ok := acw.getAddrConn().getReadyTransport(); ok {
			if channelz.IsOn() {
				return t, doneChannelzWrapper(acw, pickResult.Done), nil
			}
			return t, pickResult.Done, nil
		}
	}
}

func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {
    // 创建连接 
	ac.connect()
	return nil, false
}

func (ac *addrConn) connect() error {
	// 异步连接 
	go ac.resetTransport()
	return nil
}

func (ac *addrConn) resetTransport() {
	for i := 0; ; i++ {
	    // 创建连接,如果有一个创建成功,返回
		newTr, addr, reconnect, err := ac.tryAllAddrs(addrs, connectDeadline)
	}
}

func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) (transport.ClientTransport, *grpcsync.Event, error) {
    // NewClientTransport创建 
	newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, copts, onPrefaceReceipt, onGoAway, onClose)
}

func NewClientTransport(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (ClientTransport, error) {
	return newHTTP2Client(connectCtx, ctx, addr, opts, onPrefaceReceipt, onGoAway, onClose)
}

// 最关键的方法来了,太多的细节,在这里关注如何接受incoming消息 
func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (_ *http2Client, err error) {
    // Start the reader goroutine for incoming message. Each transport has
	// a dedicated goroutine which reads HTTP2 frame from network. Then it
	// dispatches the frame to the corresponding stream entity.
	go t.reader()
}

// 处理http2数据和server端对应。
func (t *http2Client) reader() {
	defer close(t.readerDone)
	// Check the validity of server preface.
	frame, err := t.framer.fr.ReadFrame()
	if err != nil {
		t.Close() // this kicks off resetTransport, so must be last before return
		return
	}
	t.conn.SetReadDeadline(time.Time{}) // reset deadline once we get the settings frame (we didn't time out, yay!)
	if t.keepaliveEnabled {
		atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
	}
	sf, ok := frame.(*http2.SettingsFrame)
	if !ok {
		t.Close() // this kicks off resetTransport, so must be last before return
		return
	}
	t.onPrefaceReceipt()
	t.handleSettings(sf, true)

	// loop to keep reading incoming messages on this transport.
	for {
		t.controlBuf.throttle()
		frame, err := t.framer.fr.ReadFrame()
		if t.keepaliveEnabled {
			atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
		}
		if err != nil {
			// Abort an active stream if the http2.Framer returns a
			// http2.StreamError. This can happen only if the server's response
			// is malformed http2.
			if se, ok := err.(http2.StreamError); ok {
				t.mu.Lock()
				s := t.activeStreams[se.StreamID]
				t.mu.Unlock()
				if s != nil {
					// use error detail to provide better err message
					code := http2ErrConvTab[se.Code]
					errorDetail := t.framer.fr.ErrorDetail()
					var msg string
					if errorDetail != nil {
						msg = errorDetail.Error()
					} else {
						msg = "received invalid frame"
					}
					t.closeStream(s, status.Error(code, msg), true, http2.ErrCodeProtocol, status.New(code, msg), nil, false)
				}
				continue
			} else {
				// Transport error.
				t.Close()
				return
			}
		}
		switch frame := frame.(type) {
		case *http2.MetaHeadersFrame:
			t.operateHeaders(frame)
		case *http2.DataFrame:
			t.handleData(frame)
		case *http2.RSTStreamFrame:
			t.handleRSTStream(frame)
		case *http2.SettingsFrame:
			t.handleSettings(frame, false)
		case *http2.PingFrame:
			t.handlePing(frame)
		case *http2.GoAwayFrame:
			t.handleGoAway(frame)
		case *http2.WindowUpdateFrame:
			t.handleWindowUpdate(frame)
		default:
			if logger.V(logLevel) {
				logger.Errorf("transport: http2Client.reader got unhandled frame type %v.", frame)
			}
		}
	}
}

客户端http2流程图如下,

在这里插入图片描述
接下来看看RecvMsg的调用逻辑。

// 当面使用了withRetry方法,用于重试
func (cs *clientStream) RecvMsg(m interface{}) error {
    // 省略一些代码,关注a.recvMsg(m, recvInfo)代码
	err := cs.withRetry(func(a *csAttempt) error {
		return a.recvMsg(m, recvInfo)
	}, cs.commitAttemptLocked)
	return err
}

recvMsg实现在下面,

func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) {
    // 省略一些代码
	err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decomp)
}

func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interface{}, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) error {
    // 接收和解压缩 
	d, err := recvAndDecompress(p, s, dc, maxReceiveMessageSize, payInfo, compressor)
}

func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) ([]byte, error) {
    // 接收
	pf, d, err := p.recvMsg(maxReceiveMessageSize)
}


// 从stream中读出完整的gRPC消息,返回消息和payload形式,调用者管理返回的消息内存。
// 如果存在错误,可能的错误是,
// io.EOF,当没有消息的时候
// io.ErrUnexpectedEOF
// of type transport.ConnectionError
// 或者status包中定义的错误。
func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byte, err error) {
    // 读请求头 
	if _, err := p.r.Read(p.header[:]); err != nil {
		return 0, nil, err
	}
	// 读消息体
	msg = make([]byte, int(length))
	if _, err := p.r.Read(msg); err != nil {
		if err == io.EOF {
			err = io.ErrUnexpectedEOF
		}
		return 0, nil, err
	}
	return pf, msg, nil
}

以上是关于grpc-go客户端源码分析的主要内容,如果未能解决你的问题,请参考以下文章

gRPC-go源码:连接管理

gRPC-go源码:连接管理

gRPC-go源码:连接管理

grpc-go源码剖析五十九之客户端一侧,是如何处理截止时间呢?

gRPC-go源码:ClientConn

gRPC-go源码:ClientConn