[go微服务-16] Go RPC 实现服务间通信

Posted IT技术小屋

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[go微服务-16] Go RPC 实现服务间通信相关的知识,希望对你有一定的参考价值。

引言


Go RPC是指Go语言原生支持的RPC框架

非常适合作为后续深入了解 RPC框架时的研究对象


先通过一个字符串服务为案例简单讲解Go RPC是如何进行通信的

然后再具体剖析Go RPC的底层原理和实现


Go语言RPC过程调用实践


服务端只需实现对外提供的远程过程方法和结构体然后将其注册到RPC服务中

客户端就可以通过其服务名称和方法名称进行RPC方法调用

type StringRequest struct {A stringstring}
type Service interface {// Concat a and bConcat(req StringRequest, ret*string) error}
type StringService struct {}
func (s StringService) Concat(req StringRequest, ret *string) error {//test for length overflowif len(req.A)+len(req.B) > StrMaxSize {*ret=""return ErrMaxSize}*ret = req.A + req.Breturn nil}
func main(){stringService :=new(service.StringService)rpc.Register(stringService)rpc.HandleHTTP()l, e :=net.Listen("tcp""127.0.0.1:1234")if e !=nil {log.Fatal("listen error:", e)}http.Serve(l, nil)}
func main(){client, err :=rpc.DialHTTP("tcp", "127.0.0.1:1234")if err != nil {log.Fatal("dialing:", err)}stringReq :=&service.StringRequest{"A", "B"}var reply stringerr = client.Call("StringService.Concat", stringReq, &reply)if err != nil {log.Fatal("Concat error:", err)}//异步的调用方式call := client.Call("stringService.Concat", stringReq, &reply)_:=<-call.Done}


Go RPC原理解析


首先对 RPC的服务(Server)端代码进行分析

包括注册服务、反射处理和存根保存

然后讲解服务端处理RPC请求的流程

最后讲解客户(Client)端的RPC请求处理


[go微服务-16] Go RPC 实现服务间通信

func (server *Server) register(rcvr interfacel, name string, useName bool) error{//如果服务为空,默认注册一个if server.serviceMap ==nil {server.serviceMap = make(map[string]*service)}//获取注册服务的反射信息s:=new(service)s.typ = reflect.TypeOf(rcvr)s.rcvr = reflect.ValueOf(rcvr)//可以使用自定义名称sname := reflect.lndirect(s.rcvr).Type().Name()if useName {sname = name}//方法必须是暴露的,既服务名首字符大写;不允许重复注册。代码有省略if !isExported(sname)&&!useName {}if_, present := server.serviceMap[sname];present {}s.name = sname/开始注册 rpc struct内部的方法存根s.method = suitableMethods(s.typ, true)if len(s.method)==O{//如果struct内部一个方法也没,那么直接报错,打印详细的错误信息}//保存在server的serviceMap中server.serviceMap[s.name] =sreturn nil}

(1)接收请求

func(server *Server)Accept(lis net.Listener){for {conn, err := lis.Accept()if err !=nil{log.Fatal("rpc.Serve: accept:", err.Error())}// accept连接以后,打开一个goroutine处理请求go server.ServeConn(conn)}}

(2)读取并解析请求

func (server *Server) ServeConn(conn io.ReadWriteCloser){buf := bufio.NewWriter(conn)srv := &gobServerCodec{rwc: conn,dec: gob.NewDecoder(conn),enc: gob.NewEncoder(buf),encBuf: buf,}//根据指定的codec进行协议解析server.ServeCodec(srv)}
func (server *Server) ServeCodec(codec ServerCodec){sending:= new(sync.Mutex)for {//解析请求service, mtype, req, argv, replyv, keepReading, err := server.readRequest (codec)if err !=nil {if debugLog && err !=io.EOF {log.Println("rpc:", err)}
if !keepReading{break}// send a response if we actually managed to read a header.//如果当前请求错误了,我们应该返回信息,然后继续处理if req != nil {server.sendResponse(sending, req, invalidRequest, codec, err.Error())server.freeRequest(req)}continue}//因为需要继续处理后续请求,所以开一个gorutine处理rpc方法go service.call(server, sending, mtype, req, argv, replyv, codec)//如果连接关闭了需要释放资源codec.Close()}

(3)执行远程方法并返回响应

func (s *service) call(server *Server, sending *sync.Mutex, mtype *methodType,req *Request, argv, replyv reflect.Value, codec ServerCodec){function := mtype.method.Func//这里是真正调用rpc方法的地方returnValues := function.Call([]reflect.Value{s.rcvr, argv, replyv})errlnter := returnValues[0].Interface()errmsg :=""//处理返回请求了server.sendResponse(sending, req, replyv.Interface(), codec,errmsg)server.freeRequest(req)}


客户端发送RPC请求原理


(1)同步调用和异步调用

func (client *Client)Call(serviceMethod string, args interfacel}, reply interfacel) error{call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Donereturn call.Error}
//异步调用实现func (client*Client) Go(serviceMethod string, argsinterface{l}, reply interface{l}, done chan *Call) *Call {//初始化Callcall := new(Call)call.ServiceMethod = serviceMethodcall.Args = argscall.Reply =replyif done == nil {done = make(chan *Call,10)// buffered.}else {if cap(done)==0{log.Panic("rpc: done channel is unbuffered")}}call.Done = done//调用Client的send方法client.send(call)return call}
type Call struct {ServiceMethod string //服务名及方法名格式:服务.方法Args interface //函数的请求参数(*struct).Reply interfacel //函数的响应参数(*struct).Error error //方法完成后error的状态.Done chan *Call //方法调用结束后的channel.}}

(2)请求参数编码

func (client*Client) send(call *Call){//....//生成seq,每次调用均生成唯一的seq,在服务端返回结果后会通过该值进行匹配seq :=client.seqclient.seq++client.pending[seq] = callclient.mutex.Unlock()//请求并发送请求client.request.Seq = seqclient.request.ServiceMethod = call.ServiceMethoderr := client.codec.WriteRequest(&client.request, call.Args)if err != nil {//发送请求错误时,将map中call对象删除.client.mutex.Lock()call= client.pending[seq]delete(client.pending, seq)client.mutex.Unlock()}}

(3)接收返回值

func (client *Client) input(){var err errorvar response Responsefor err == nil {response = Response{}//通过response中的 Seq获取call对象seq := response.Seqclient.mutex.Lock()call :=client.pending[seq]delete(client.pending, seq)client.mutex.Unlock()switch {case call == nil:case response.Error != ""://上述两个case,一个处理call为nil,另外处理服务端返回的错误,直接将错误返回default://通过编码器,将Resonse的body部分解码成reply对象.err = client.codec.ReadResponseBody(call.Reply)if err != nil {call.Error = errors.New("reading body " + err.Error())}call.done()}}}}
func (call *Call) done(){select {case call.Done <- call:// okdefault:if debugLog {log.Println("rpc: discarding Call reply due to insufficient Done chan capacity")}}}


小结


Go语言原生RPC代码精简、可扩展性高但是只实现了RPC最基本的网络通信

GitHub有一个基于RPC的功能增强版本一—rpcx

支持了大部分主流RPC的特性


本课时我们从源码角度分析了Go语言原生RPC框架

希望能给你带来对 RPC框架的整体认知


以上是关于[go微服务-16] Go RPC 实现服务间通信的主要内容,如果未能解决你的问题,请参考以下文章

Go微服务—— RPC

Go微服务—— RPC

go-micro微服务框架

Go微服务框架-2.Go语言RPC编程实践

go微服务RPC的原理与Go RPC

go微服务RPC的原理与Go RPC