go rpc 源码分析

Posted Golang语言社区

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了go rpc 源码分析相关的知识,希望对你有一定的参考价值。

1.概述

go 源码中带了rpc框架,以相对精简的当时方式实现了rpc功能,目前源码中的rpc官方已经宣布不再添加新功能,并推荐使用grpc.
作为go标准库中rpc框架,还是有很多地方值得借鉴及学习,这里将从源码角度分析go原生rpc框架,以及分享一些在使用过程中遇到的坑.

2.server端

server端主要分为两个步骤,首先进行方法注册,通过反射处理将方法取出,并存到map中.然后是网络调用,主要是监听端口,读取数据包,解码请求
调用反射处理后的方法,将返回值编码,返回给客户端.

2.1 方法注册

go rpc 源码分析

2.1.1 Register
// Register publishes the receiver's methods in the DefaultServer.
func Register(rcvr interface{}) error { return DefaultServer.Register(rcvr) }
// RegisterName is like Register but uses the provided name for the type
// instead of the receiver's concrete type.
func RegisterName(name string, rcvr interface{}) error {
   return DefaultServer.RegisterName(name, rcvr)
}

如上,方法注册的入口函数有两个,分别为Register以及RegisterName,这里interface{}通常是带方法的对象.如果想要自定义方法的接收对象,则可以使用RegisterName.

2.1.2 反射处理过程
type methodType struct {
   sync.Mutex // protects counters
   method     reflect.Method    //反射后的函数
   ArgType    reflect.Type      //请求参数的反射值
   ReplyType  reflect.Type      //返回参数的反射值
   numCalls   uint              //调用次数
}
type service struct {
   name   string                 // 服务名,这里通常为register时的对象名或自定义对象名
   rcvr   reflect.Value          // 服务的接收者的反射值
   typ    reflect.Type           // 接收者的类型
   method map[string]*methodType // 对象的所有方法的反射结果.
}

反射处理过程,其实就是将对象以及对象的方法,通过反射生成上面的结构,如注册Arith.Multiply(xx,xx) error 这样的对象时,生成的结构为 map["Arith"]service, service 中ethod为 map["Multiply"]methodType.

几个关键代码如下:

生成service对象

func (server *Server) register(rcvr interface{}, name string, useName bool) error {
   //生成service
   s := new(service)
   s.typ = reflect.TypeOf(rcvr)
   s.rcvr = reflect.ValueOf(rcvr)
   sname := reflect.Indirect(s.rcvr).Type().Name()
   ....
   s.name = sname
   // 通过suitableMethods将对象的方法转换成map[string]*methodType结构
   s.method = suitableMethods(s.typ, true)
   ....
   //service存储为键值对
   if _, dup := server.serviceMap.LoadOrStore(sname, s); dup {
       return errors.New("rpc: service already defined: " + sname)
   }
   return nil
}

生成 map[string] *methodType

func suitableMethods(typ reflect.Type, reportErr bool) map[string]*methodType {
   methods := make(map[string]*methodType)
   //通过反射,遍历所有的方法
   for m := 0; m < typ.NumMethod(); m++ {
       method := typ.Method(m)
       mtype := method.Type
       mname := method.Name
       // Method must be exported.
       if method.PkgPath != "" {
           continue
       }
       // Method needs three ins: receiver, *args, *reply.
       if mtype.NumIn() != 3 {
           if reportErr {
               log.Println("method", mname, "has wrong number of ins:", mtype.NumIn())
           }
           continue
       }
       //取出请求参数类型
       argType := mtype.In(1)
       ...
       // 取出响应参数类型,响应参数必须为指针
       replyType := mtype.In(2)
       if replyType.Kind() != reflect.Ptr {
           if reportErr {
               log.Println("method", mname, "reply type not a pointer:", replyType)
           }
           continue
       }
       ...
       // 去除函数的返回值,函数的返回值必须为error.
       if returnType := mtype.Out(0); returnType != typeOfError {
           if reportErr {
               log.Println("method", mname, "returns", returnType.String(), "not error")
           }
           continue
       }
       //将方法存储成key-value
       methods[mname] = &methodType{method: method, ArgType: argType, ReplyType: replyType}
   }
   return methods
}

2.2 网络调用

// Request 每次rpc调用的请求的头部分
type Request struct {
   ServiceMethod string   // 格式为: "Service.Method"
   Seq           uint64   // 客户端生成的序列号
   next          *Request // server端保持的链表
}
// Response 每次rpc调用的响应的头部分
type Response struct {
   ServiceMethod string    // 对应请求部分的 ServiceMethod
   Seq           uint64    // 对应请求部分的 Seq
   Error         string    // 错误
   next          *Response // server端保持的链表
}

如上,网络调用主要用到上面的两个结构体,分别是请求参数以及返回参数,通过编解码器(gob/json)实现二进制到结构体的相互转换.主要涉及到下面几个步骤:

go rpc 源码分析

关键代码如下:
取出请求,并得到相应函数的调用参数

func (server *Server) readRequestHeader(codec ServerCodec) (svc *service, mtype *methodType, req *Request, keepReading bool, err error) {
   // Grab the request header.
   req = server.getRequest()
   //编码器读取生成请求
   err = codec.ReadRequestHeader(req)
   if err != nil {
       //错误处理
       ...
       return
   }
   keepReading = true
   //取出服务名以及方法名
   dot := strings.LastIndex(req.ServiceMethod, ".")
   if dot < 0 {
       err = errors.New("rpc: service/method request ill-formed: " + req.ServiceMethod)
       return
   }
   serviceName := req.ServiceMethod[:dot]
   methodName := req.ServiceMethod[dot+1:]
   //从注册时生成的map中查询出相应的方法的结构
   svci, ok := server.serviceMap.Load(serviceName)
   if !ok {
       err = errors.New("rpc: can't find service " + req.ServiceMethod)
       return
   }
   svc = svci.(*service)
   //获取出方法的类型
   mtype = svc.method[methodName]
   if mtype == nil {
       err = errors.New("rpc: can't find method " + req.ServiceMethod)
   }

循环处理,不断读取链接上的字节流,解密出请求,调用方法,编码响应,回写到客户端.

func (server *Server) ServeCodec(codec ServerCodec) {
   sending := new(sync.Mutex)
   for {
       //读取请求
       service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
       if err != nil {
           ...
       }
       //调用
       go service.call(server, sending, mtype, req, argv, replyv, codec)
   }
   codec.Close()
}

通过参数进行函数调用

func (s *service) call(server *Server, sending *sync.Mutex, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) {
   mtype.Lock()
   mtype.numCalls++
   mtype.Unlock()
   function := mtype.method.Func
   // 通过反射进行函数调用
   returnValues := function.Call([]reflect.Value{s.rcvr, argv, replyv})
   // 返回值是不为空时,则取出错误的string
   errInter := returnValues[0].Interface()
   errmsg := ""
   if errInter != nil {
       errmsg = errInter.(error).Error()
   }
   //发送相应,并释放请求结构
   server.sendResponse(sending, req, replyv.Interface(), codec, errmsg)
   server.freeRequest(req)
}

3.client端

// 异步调用
func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
}
// 同步调用
func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
}
// Call represents an active RPC.
type Call struct {
   ServiceMethod string      // 服务名及方法名 格式:服务.方法
   Args          interface{} // 函数的请求参数 (*struct).
   Reply         interface{} // 函数的响应参数 (*struct).
   Error         error       // 方法完成后 error的状态.
   Done          chan *Call  // 方法调用结束后的channel.
}

client端部分则相对要简单很多,主要提供Call以及Go两个方法,分别表示同步调用以及异步调用,但其实同步调用底层实现其实也是异步调用,调用时主要用到了Call结构,相关解释如上.

3.1 主要流程

go rpc 源码分析

3.2 关键代码

发送请求部分代码,每次send一次请求,均生成一个call对象,并使用seq作为key保存在map中,服务端返回时从map取出call,进行相应处理

func (client *Client) send(call *Call) {
   //请求级别的锁
   client.reqMutex.Lock()
   defer client.reqMutex.Unlock()
   // Register this call.
   client.mutex.Lock()
   if client.shutdown || client.closing {
       call.Error = ErrShutdown
       client.mutex.Unlock()
       call.done()
       return
   }
   //生成seq,每次调用均生成唯一的seq,在服务端相应后会通过该值进行匹配
   seq := client.seq
   client.seq++
   client.pending[seq] = call
   client.mutex.Unlock()
   // 请求并发送请求
   client.request.Seq = seq
   client.request.ServiceMethod = call.ServiceMethod
   err := client.codec.WriteRequest(&client.request, call.Args)
   if err != nil {
       //发送请求错误时,将map中call对象删除.
       client.mutex.Lock()
       call = client.pending[seq]
       delete(client.pending, seq)
       client.mutex.Unlock()
       if call != nil {
           call.Error = err
           call.done()
       }
   }
}

接收响应部分的代码,这里是一个for循环,不断读取tcp上的流,并解码成Response对象以及方法的Reply对象.

func (client *Client) input() {
   var err error
   var response Response
   for err == nil {
       response = Response{}
       err = client.codec.ReadResponseHeader(&response)
       if err != nil {
           break
       }
       //通过response中的 Seq获取call对象
       seq := response.Seq
       client.mutex.Lock()
       call := client.pending[seq]
       delete(client.pending, seq)
       client.mutex.Unlock()
       switch {
       case call == nil:
           err = client.codec.ReadResponseBody(nil)
           if err != nil {
               err = errors.New("reading error body: " + err.Error())
           }
       case response.Error != "":
           //服务端返回错误,直接将错误返回
           call.Error = ServerError(response.Error)
           err = client.codec.ReadResponseBody(nil)
           if err != nil {
               err = errors.New("reading error body: " + err.Error())
           }
           call.done()
       default:
           //通过编码器,将Resonse的body部分解码成reply对象.
           err = client.codec.ReadResponseBody(call.Reply)
           if err != nil {
               call.Error = errors.New("reading body " + err.Error())
           }
           call.done()
       }
   }
   // 客户端退出处理
   client.reqMutex.Lock()
   client.mutex.Lock()
   client.shutdown = true
   closing := client.closing
   if err == io.EOF {
       if closing {
           err = ErrShutdown
       } else {
           err = io.ErrUnexpectedEOF
       }
   }
   for _, call := range client.pending {
       call.Error = err
       call.done()
   }
   client.mutex.Unlock()
   client.reqMutex.Unlock()
   if debugLog && err != io.EOF && !closing {
       log.Println("rpc: client protocol error:", err)
   }
}

4.一些坑

  • 同步调用无法超时

由于原生rpc只提供两个方法,同步的Call以及异步的Go,同步的Call服务端不返回则会一直阻塞,这里如果存在大量的不返回,会导致协程一直无法释放.

  • 异步调用超时后会内存泄漏

基于异步调用加channel实现超时功能也会存在泄漏问题,原因是client的请求会存在map结构中,Go函数退出并不会清理map的内容,因此如果server端不返回的话,map中的请求会一直存储,从而导致内存泄漏.

5. 总结

总的来说,go原生rpc算是个基础版本的rpc,代码精简,可扩展性高,但是只是实现了rpc最基本的网络通讯,像超时熔断,链接管理(保活与重连),服务注册发现,还是欠缺的,因此还是达不到生产环境开箱即用,不过git就有一个基于rpc的功能增强版本,叫rpcx,支持了大部分主流rpc的特性.

6. 参考

rpc https://golang.org/pkg/net/rpc/


社区活动

            (仅剩一个名额)


版权申明:内容来源网络(彬哥整理),版权归原创者所有。除非无法确认,我们都会标明作者及出处,如有侵权烦请告知,我们会立即删除并表示歉意。谢谢。



Golang语言社区

ID:Golangweb

游戏服务器架构丨分布式技术丨大数据丨游戏算法学习


以上是关于go rpc 源码分析的主要内容,如果未能解决你的问题,请参考以下文章

[Ethereum] 以太坊源码分析RPC

go-zero框架之zrpc.RpcServerConf配置源码分析

GRPC框架源码分析

SOFA 源码分析 —— 服务发布过程

透视RPC协议:SOFA-BOLT协议源码分析

Android 插件化VirtualApp 源码分析 ( 目前的 API 现状 | 安装应用源码分析 | 安装按钮执行的操作 | 返回到 HomeActivity 执行的操作 )(代码片段