grpc-go客户端源码分析
Posted inet_ygssoftware
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了grpc-go客户端源码分析相关的知识,希望对你有一定的参考价值。
grpc-go客户端源码分析
代码讲解基于v1.37.0版本。
和grpc-go服务端源码分析一样,我们先看一段示例代码,
const (
address = "localhost:50051"
defaultName = "world"
)
func main() {
// Set up a connection to the server.
conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
c := pb.NewGreeterClient(conn)
// Contact the server and print out its response.
name := defaultName
if len(os.Args) > 1 {
name = os.Args[1]
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
r, err := c.SayHello(ctx, &pb.HelloRequest{Name: name})
if err != nil {
log.Fatalf("could not greet: %v", err)
}
log.Printf("Greeting: %s", r.GetMessage())
}
先调用grpc.Dial生成一个grpc.ClientConn对象,具体的初始化操作在grpc.DialContext方法中。
DialContext首先初始化空对象ClientConn,然后判断opts …DialOption数据是否存在,如果存在就执行传入的函数并设置特定属性。
// Dial creates a client connection to the given target.
func Dial(target string, opts ...DialOption) (*ClientConn, error) {
return DialContext(context.Background(), target, opts...)
}
代码的关键点在于创建ClientConn对象,对应结构体包括的字段。
// ClientConn represents a virtual connection to a conceptual endpoint, to
// perform RPCs.
//
// ClientConn是用于RPC通信的虚拟连接
// A ClientConn is free to have zero or more actual connections to the endpoint
// based on configuration, load, etc. It is also free to determine which actual
// endpoints to use and may change it every RPC, permitting client-side load
// balancing.
//
// ClientConn可以选择连接终端的数量同时选择负载均衡逻辑
//
// A ClientConn encapsulates a range of functionality including name
// resolution, TCP connection establishment (with retries and backoff) and TLS
// handshakes. It also handles errors on established connections by
// re-resolving the name and reconnecting.
// ClientConn封装了一系列的功能,包括名称解析,TCP连接建立(包括重试和退避策略),TLS,重新名字解析和重联机制。
type ClientConn struct {
ctx context.Context
cancel context.CancelFunc
target string
parsedTarget resolver.Target // 负载均衡选择
authority string
dopts dialOptions // 初始化可设置选项,在每一次请求会带上,看call.go中的combine方法
csMgr *connectivityStateManager // 连接状态维护
balancerBuildOpts balancer.BuildOptions // 忽略
blockingpicker *pickerWrapper // 负载均衡设置
safeConfigSelector iresolver.SafeConfigSelector // 忽略
mu sync.RWMutex
resolverWrapper *ccResolverWrapper // 实现了resolver.ClientConn,位于./resolver/resolver.go中,ClientConn的上层包装器(疑惑?)
sc *ServiceConfig
conns map[*addrConn]struct{} // 存放连接的地方
// Keepalive parameter can be updated if a GoAway is received.
mkp keepalive.ClientParameters
curBalancerName string
balancerWrapper *ccBalancerWrapper // 负载均衡器上的包装器(疑惑?)
retryThrottler atomic.Value
firstResolveEvent *grpcsync.Event
channelzID int64 // channelz unique identification number
czData *channelzData
lceMu sync.Mutex // protects lastConnectionError
lastConnectionError error
}
调用pb.NewGreeterClient(conn)返回当前PB的Client对象,具体的实现代码:
func NewGreeterClient(cc grpc.ClientConnInterface) GreeterClient {
return &greeterClient{cc}
}
// ClientConnInterface defines the functions clients need to perform unary and
// streaming RPCs. It is implemented by *ClientConn, and is only intended to
// be referenced by generated code.
// ClientConnInterface定义了执行RPC方法(包括unary和streaming)的对象需要实现的函数
// ClientConn实现了该interface{},只希望被自动生成的代码调用。
type ClientConnInterface interface {
// Invoke performs a unary RPC and returns after the response is received
// into reply.
// Invoke执行unary类型的请求并返回数据
Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...CallOption) error
// NewStream begins a streaming RPC.
// NewStream开启streaming RPC.
NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error)
}
虽然ClientConn实现了ClientConnInterface,然而实现代码并没有放到一起。
ClientConn和ClientConnInterface定义在clientconn.go文件中。ClientConn实现Invoke方法是在call.go文件中。ClientConn实现NewStream方法是在stream.go文件中。
说说c.SayHello(ctx, &pb.HelloRequest{Name: name}),使用之前定义的GreeterClient来调用SayHello方法,具体代码实现如下,
func (c *greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) {
out := new(HelloReply)
// c是当前初始化的greetClient,cc是之前初始化好的ClientConn,Invoke表示使用unary方法,接下来就可以跳转到call.go文件中查看。
err := c.cc.Invoke(ctx, "/helloworld.Greeter/SayHello", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
接下来看Invoke方法具体实现,
func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error {
// 省略一些代码
return invoke(ctx, method, args, reply, cc, opts...)
}
func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
// 创建ClientStream,newClientStream这个方法unary方法也会调用,使用第二个参数StreamDesc来区分。
cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
if err != nil {
return err
}
// 发消息,cs是grpc.clientStream对象,调用clientStream的SendMsg方法
if err := cs.SendMsg(req); err != nil {
return err
}
// 收消息,cs是grpc.clientStream对象,调用clientStream的RecvMsg方法
return cs.RecvMsg(reply)
}
对于newClientStream的调用,
func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
var newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
return newClientStreamWithParams(ctx, desc, cc, method, mc, onCommit, done, opts...)
}
return newStream(ctx, func() {})
}
// 初始化stream传入参数说明,
// desc *StreamDesc,决定调用unary还是stream
// cc *ClientConn,grpc连接对象
// opts ...CallOption,初始化对象传入的各种参数
func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc serviceconfig.MethodConfig, onCommit, doneFunc func(), opts ...CallOption) (_ iresolver.ClientStream, err error) {
// 省略部分代码
cs := &clientStream{
callHdr: callHdr,
ctx: ctx,
methodConfig: &mc,
opts: opts,
callInfo: c,
cc: cc,
desc: desc,
codec: c.codec,
cp: cp,
comp: comp,
cancel: cancel,
beginTime: beginTime,
firstAttempt: true,
onCommit: onCommit,
}
op := func(a *csAttempt) error { return a.newStream() }
// 使用forloop初始化stream,这个代码写的比较绕
if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }); err != nil {
cs.finish(err)
return nil, err
}
return cs, nil
}
看看withRetry实现方式,
func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) error {
cs.mu.Lock()
for {
if cs.committed {
cs.mu.Unlock()
return op(cs.attempt)
}
a := cs.attempt // 这里是指针
cs.mu.Unlock()
err := op(a) // 将指针传入op函数中,op是`op := func(a *csAttempt) error { return a.newStream() }`
cs.mu.Lock()
if a != cs.attempt {
// We started another attempt already.
continue
}
if err == io.EOF {
<-a.s.Done()
}
if err == nil || (err == io.EOF && a.s.Status().Code() == codes.OK) {
onSuccess()
cs.mu.Unlock()
return err
}
if err := cs.retryLocked(err); err != nil {
cs.mu.Unlock()
return err
}
}
}
func (a *csAttempt) newStream() error {
cs := a.cs
cs.callHdr.PreviousAttempts = cs.numRetries
s, err := a.t.NewStream(cs.ctx, cs.callHdr)
if err != nil {
if _, ok := err.(transport.PerformedIOError); ok {
// Return without converting to an RPC error so retry code can
// inspect.
return err
}
return toRPCErr(err)
}
cs.attempt.s = s // 在这里设置stream,绕了一路
cs.attempt.p = &parser{r: s}
return nil
}
通过上面代码分析,我们已经知道invoke方法中stream的初始化逻辑,接下来看看SendMsg和RecvMsg方法。
首先SendMsg和RecvMsg方法都属于ClientStream接口,该接口中还有其他的方法,主要用于http2通信。
// ClientStream defines the client-side behavior of a streaming RPC.
// ClientStream定义了rpc通信的客户端行为。
//
// All errors returned from ClientStream methods are compatible with the
// status package.
// 所有ClientStream方法返回的错误都适用于status包中的定义。
type ClientStream interface {
// Header returns the header metadata received from the server if there
// is any. It blocks if the metadata is not ready to read.
// Header会返回服务端的header元数据,如果元数据还未满足条件会一直阻塞。
Header() (metadata.MD, error)
// Trailer returns the trailer metadata from the server, if there is any.
// It must only be called after stream.CloseAndRecv has returned, or
// stream.Recv has returned a non-nil error (including io.EOF).
// Trailer会返回trailer元数据,只会在stream.CloseAndRecv返回或者stream.Recv返回错误的时候才会被调用。
Trailer() metadata.MD
// CloseSend closes the send direction of the stream. It closes the stream
// when non-nil error is met. It is also not safe to call CloseSend
// concurrently with SendMsg.
// CloseSend用于关闭发送方的流,如果遇到非空错误,也会关闭。CloseSend和SendMsg不是并发安全的。
CloseSend() error
// Context returns the context for this stream.
//
// It should not be called until after Header or RecvMsg has returned. Once
// called, subsequent client-side retries are disabled.
// Context 返回了stream的上下文,在Header或者RecvMsg返回之后不应该被调用。
// 一旦被点用,后续的客户端重试都无效了。
Context() context.Context
// SendMsg is generally called by generated code. On error, SendMsg aborts
// the stream. If the error was generated by the client, the status is
// returned directly; otherwise, io.EOF is returned and the status of
// the stream may be discovered using RecvMsg.
// SendMsg通常被自动生成的代码调用。如果遇到错误,SendMsg就会停止stream。
// 如果错误是因为客户端产生的,状态会立即返回,否则返回io.EOF,使用RecvMsg能够发现stream的状态。
//
// SendMsg blocks until:
// - There is sufficient flow control to schedule m with the transport, or
// - The stream is done, or
// - The stream breaks.
// SendMsg会阻塞的三种场景,流被终止,流完成,存在足够流控制
// SendMsg does not wait until the message is received by the server. An
// untimely stream closure may result in lost messages. To ensure delivery,
// users should ensure the RPC completed successfully using RecvMsg.
// SendMsg不会阻塞直到服务器接收到完整的数据,过早的流关闭会导致消息丢失。
// 为了保证接受率,用户应该使用RecvMsg保证RPC成功结束。
// It is safe to have a goroutine calling SendMsg and another goroutine
// calling RecvMsg on the same stream at the same time, but it is not safe
// to call SendMsg on the same stream in different goroutines. It is also
// not safe to call CloseSend concurrently with SendMsg.
// 在不同的协程中并发调用SendMsg和RecvMsg是可以的,但是如果在不同的协程中同时调用SendMsg不是并发安全的。
// CloseSend和SendMsg也不是并发安全的。
SendMsg(m interface{}) error
// RecvMsg blocks until it receives a message into m or the stream is
// done. It returns io.EOF when the stream completes successfully. On
// any other error, the stream is aborted and the error contains the RPC
// status.
// RecvMsg会一直阻塞直到接收到所有的数据或者stream停止了。如果流成功完成了,会返回io.EOF。
// 对于其他错误,流会被终止,并且错误会带有RPC状态。
//
// It is safe to have a goroutine calling SendMsg and another goroutine
// calling RecvMsg on the same stream at the same time, but it is not
// safe to call RecvMsg on the same stream in different goroutines.
// SendMsg和RecvMsg调用是并发安全的,在不同的协程中调用RecvMsg不是并发安全。
RecvMsg(m interface{}) error
}
分析一下invoke方法调用的SendMsg方法,
func (cs *clientStream) SendMsg(m interface{}) (err error) {
// 省略部分代码
// 处理一下消息
hdr, payload, data, err := prepareMsg(m, cs.codec, cs.cp, cs.comp)
if err != nil {
return err
}
msgBytes := data // Store the pointer before setting to nil. For binary logging.
op := func(a *csAttempt) error {
// 发送消息
err := a.sendMsg(m, hdr, payload, data)
// nil out the message and uncomp when replaying; they are only needed for
// stats which is disabled for subsequent attempts.
m, data = nil, nil
return err
}
// 采用重试的方式确保消息发送出去
err = cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) })
// 省略部分代码
}
(a *csAttempt) sendMsg发送消息中的Write属于type ClientTransport interface{}中的方法,在internal/transport/transport.go文件中。
func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error {
cs := a.cs
if err := a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams}); err != nil {
// ...
}
// ...
return nil
}
// Write formats the data into HTTP2 data frame(s) and sends it out. The caller
// should proceed only if Write returns nil.
// Writ方法将数据弄成数据帧的方式,然后发送出去,关注代码最后一句controlBuf.put功能。
func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
if opts.Last {
// If it's the last message, update stream state.
if !s.compareAndSwapState(streamActive, streamWriteDone) {
return errStreamDone
}
} else if s.getState() != streamActive {
return errStreamDone
}
df := &dataFrame{
streamID: s.id,
endStream: opts.Last,
h: hdr,
d: data,
}
if hdr != nil || data != nil { // If it's not an empty data frame, check quota.
if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
return err
}
}
return t.controlBuf.put(df)
}
// controlBuffer is a way to pass information to loopy.
// Information is passed as specific struct types called control frames.
// A control frame not only represents data, messages or headers to be sent out
// but can also be used to instruct loopy to update its internal state.
// It shouldn't be confused with an HTTP2 frame, although some of the control frames
// like dataFrame and headerFrame do go out on wire as HTTP2 frames.
// controlBuffer是将信息传递给loopy的一种方式,通过特殊的结构体形式传递的信息称为控制control frames。control frame不仅仅代表数据,消息,消息头,还可以通知loopy来更新内部状态。
// 注意不能和http2的帧搞混,尽管某些像dataFrame,headerFrame。
type controlBuffer struct {
ch chan struct{}
done <-chan struct{}
mu sync.Mutex
consumerWaiting bool
list *itemList
err error
// transportResponseFrames counts the number of queued items that represent
// the response of an action initiated by the peer. trfChan is created
// when transportResponseFrames >= maxQueuedTransportResponseFrames and is
// closed and nilled when transportResponseFrames drops below the
// threshold. Both fields are protected by mu.
transportResponseFrames int
trfChan atomic.Value // *chan struct{}
}
关于err = cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) })如何接收消息,这里详细说明一下,调用流程较长。
func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) error {
for {
// retryLocked很重要,到底做什么呢?
if err := cs.retryLocked(err); err != nil {
cs.mu.Unlock()
return err
}
}
}
func (cs *clientStream) retryLocked(lastErr error) error {
for {
// 看newAttemptLocked
if err := cs.newAttemptLocked(nil, nil); err != nil {
return err
}
}
}
func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo *traceInfo) (retErr error) {
newAttempt := &csAttempt{
cs: cs,
dc: cs.cc.dopts.dc,
statsHandler: sh,
trInfo: trInfo,
}
// 每次getTransport获取使用的连接,cc是ClientConn对象,涉及负载均衡了。
t, done, err := cs.cc.getTransport(ctx, cs.callInfo.failFast, cs.callHdr.Method)
if err != nil {
return err
}
if trInfo != nil {
trInfo.firstLine.SetRemoteAddr(t.RemoteAddr())
}
newAttempt.t = t
newAttempt.done = done
cs.attempt = newAttempt
return nil
}
func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
// 关注pick方法
t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{
Ctx: ctx,
FullMethodName: method,
})
}
func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (transport.ClientTransport, func(balancer.DoneInfo), error) {
// 选择满足条件的transport
if t, ok := acw.getAddrConn().getReadyTransport(); ok {
if channelz.IsOn() {
return t, doneChannelzWrapper(acw, pickResult.Done), nil
}
return t, pickResult.Done, nil
}
}
}
func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {
// 创建连接
ac.connect()
return nil, false
}
func (ac *addrConn) connect() error {
// 异步连接
go ac.resetTransport()
return nil
}
func (ac *addrConn) resetTransport() {
for i := 0; ; i++ {
// 创建连接,如果有一个创建成功,返回
newTr, addr, reconnect, err := ac.tryAllAddrs(addrs, connectDeadline)
}
}
func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) (transport.ClientTransport, *grpcsync.Event, error) {
// NewClientTransport创建
newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, copts, onPrefaceReceipt, onGoAway, onClose)
}
func NewClientTransport(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (ClientTransport, error) {
return newHTTP2Client(connectCtx, ctx, addr, opts, onPrefaceReceipt, onGoAway, onClose)
}
// 最关键的方法来了,太多的细节,在这里关注如何接受incoming消息
func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (_ *http2Client, err error) {
// Start the reader goroutine for incoming message. Each transport has
// a dedicated goroutine which reads HTTP2 frame from network. Then it
// dispatches the frame to the corresponding stream entity.
go t.reader()
}
// 处理http2数据和server端对应。
func (t *http2Client) reader() {
defer close(t.readerDone)
// Check the validity of server preface.
frame, err := t.framer.fr.ReadFrame()
if err != nil {
t.Close() // this kicks off resetTransport, so must be last before return
return
}
t.conn.SetReadDeadline(time.Time{}) // reset deadline once we get the settings frame (we didn't time out, yay!)
if t.keepaliveEnabled {
atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
}
sf, ok := frame.(*http2.SettingsFrame)
if !ok {
t.Close() // this kicks off resetTransport, so must be last before return
return
}
t.onPrefaceReceipt()
t.handleSettings(sf, true)
// loop to keep reading incoming messages on this transport.
for {
t.controlBuf.throttle()
frame, err := t.framer.fr.ReadFrame()
if t.keepaliveEnabled {
atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
}
if err != nil {
// Abort an active stream if the http2.Framer returns a
// http2.StreamError. This can happen only if the server's response
// is malformed http2.
if se, ok := err.(http2.StreamError); ok {
t.mu.Lock()
s := t.activeStreams[se.StreamID]
t.mu.Unlock()
if s != nil {
// use error detail to provide better err message
code := http2ErrConvTab[se.Code]
errorDetail := t.framer.fr.ErrorDetail()
var msg string
if errorDetail != nil {
msg = errorDetail.Error()
} else {
msg = "received invalid frame"
}
t.closeStream(s, status.Error(code, msg), true, http2.ErrCodeProtocol, status.New(code, msg), nil, false)
}
continue
} else {
// Transport error.
t.Close()
return
}
}
switch frame := frame.(type) {
case *http2.MetaHeadersFrame:
t.operateHeaders(frame)
case *http2.DataFrame:
t.handleData(frame)
case *http2.RSTStreamFrame:
t.handleRSTStream(frame)
case *http2.SettingsFrame:
t.handleSettings(frame, false)
case *http2.PingFrame:
t.handlePing(frame)
case *http2.GoAwayFrame:
t.handleGoAway(frame)
case *http2.WindowUpdateFrame:
t.handleWindowUpdate(frame)
default:
if logger.V(logLevel) {
logger.Errorf("transport: http2Client.reader got unhandled frame type %v.", frame)
}
}
}
}
客户端http2流程图如下,
接下来看看RecvMsg的调用逻辑。
// 当面使用了withRetry方法,用于重试
func (cs *clientStream) RecvMsg(m interface{}) error {
// 省略一些代码,关注a.recvMsg(m, recvInfo)代码
err := cs.withRetry(func(a *csAttempt) error {
return a.recvMsg(m, recvInfo)
}, cs.commitAttemptLocked)
return err
}
recvMsg实现在下面,
func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) {
// 省略一些代码
err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decomp)
}
func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interface{}, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) error {
// 接收和解压缩
d, err := recvAndDecompress(p, s, dc, maxReceiveMessageSize, payInfo, compressor)
}
func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) ([]byte, error) {
// 接收
pf, d, err := p.recvMsg(maxReceiveMessageSize)
}
// 从stream中读出完整的gRPC消息,返回消息和payload形式,调用者管理返回的消息内存。
// 如果存在错误,可能的错误是,
// io.EOF,当没有消息的时候
// io.ErrUnexpectedEOF
// of type transport.ConnectionError
// 或者status包中定义的错误。
func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byte, err error) {
// 读请求头
if _, err := p.r.Read(p.header[:]); err != nil {
return 0, nil, err
}
// 读消息体
msg = make([]byte, int(length))
if _, err := p.r.Read(msg); err != nil {
if err == io.EOF {
err = io.ErrUnexpectedEOF
}
return 0, nil, err
}
return pf, msg, nil
}
以上是关于grpc-go客户端源码分析的主要内容,如果未能解决你的问题,请参考以下文章