[go微服务-16] Go RPC 实现服务间通信
Posted IT技术小屋
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[go微服务-16] Go RPC 实现服务间通信相关的知识,希望对你有一定的参考价值。
引言
Go RPC是指Go语言原生支持的RPC框架
非常适合作为后续深入了解 RPC框架时的研究对象
先通过一个字符串服务为案例简单讲解Go RPC是如何进行通信的
然后再具体剖析Go RPC的底层原理和实现
Go语言RPC过程调用实践
服务端只需实现对外提供的远程过程方法和结构体然后将其注册到RPC服务中
客户端就可以通过其服务名称和方法名称进行RPC方法调用
type StringRequest struct {
A string
B string
}
type Service interface {// Concat a and b
Concat(req StringRequest, ret*string) error
}
type StringService struct {
}
func (s StringService) Concat(req StringRequest, ret *string) error {
//test for length overflow
if len(req.A)+len(req.B) > StrMaxSize {
*ret=""
return ErrMaxSize
}
*ret = req.A + req.B
return nil
}
func main(){
stringService :=new(service.StringService)
rpc.Register(stringService)
rpc.HandleHTTP()
l, e :=net.Listen("tcp", "127.0.0.1:1234")
if e !=nil {
log.Fatal("listen error:", e)
}
http.Serve(l, nil)
}
func main(){
client, err :=rpc.DialHTTP("tcp", "127.0.0.1:1234")
if err != nil {
log.Fatal("dialing:", err)
}
stringReq :=&service.StringRequest{"A", "B"}
var reply string
err = client.Call("StringService.Concat", stringReq, &reply)
if err != nil {
log.Fatal("Concat error:", err)
}
//异步的调用方式
call := client.Call("stringService.Concat", stringReq, &reply)
_:=<-call.Done
}
Go RPC原理解析
首先对 RPC的服务(Server)端代码进行分析
包括注册服务、反射处理和存根保存
然后讲解服务端处理RPC请求的流程
最后讲解客户(Client)端的RPC请求处理
func (server *Server) register(rcvr interfacel, name string, useName bool) error{
//如果服务为空,默认注册一个
if server.serviceMap ==nil {
server.serviceMap = make(map[string]*service)
}
//获取注册服务的反射信息
s:=new(service)
s.typ = reflect.TypeOf(rcvr)
s.rcvr = reflect.ValueOf(rcvr)
//可以使用自定义名称
sname := reflect.lndirect(s.rcvr).Type().Name()
if useName {
sname = name
}
//方法必须是暴露的,既服务名首字符大写;不允许重复注册。代码有省略
if !isExported(sname)&&!useName {
}
if_, present := server.serviceMap[sname];present {}
s.name = sname
/开始注册 rpc struct内部的方法存根s.method = suitableMethods(s.typ, true)
if len(s.method)==O{
//如果struct内部一个方法也没,那么直接报错,打印详细的错误信息}
//保存在server的serviceMap中
server.serviceMap[s.name] =s
return nil
}
(1)接收请求
func(server *Server)Accept(lis net.Listener){
for {
conn, err := lis.Accept()
if err !=nil{
log.Fatal("rpc.Serve: accept:", err.Error())
}
// accept连接以后,打开一个goroutine处理请求
go server.ServeConn(conn)
}
}
(2)读取并解析请求
func (server *Server) ServeConn(conn io.ReadWriteCloser){
buf := bufio.NewWriter(conn)
srv := &gobServerCodec{
rwc: conn,
dec: gob.NewDecoder(conn),
enc: gob.NewEncoder(buf),
encBuf: buf,
}
//根据指定的codec进行协议解析
server.ServeCodec(srv)
}
func (server *Server) ServeCodec(codec ServerCodec){
sending:= new(sync.Mutex)
for {
//解析请求
service, mtype, req, argv, replyv, keepReading, err := server.readRequest (codec)
if err !=nil {
if debugLog && err !=io.EOF {
log.Println("rpc:", err)
}
if !keepReading{
break
}
// send a response if we actually managed to read a header.
//如果当前请求错误了,我们应该返回信息,然后继续处理
if req != nil {
server.sendResponse(sending, req, invalidRequest, codec, err.Error())
server.freeRequest(req)
}
continue
}
//因为需要继续处理后续请求,所以开一个gorutine处理rpc方法
go service.call(server, sending, mtype, req, argv, replyv, codec)
//如果连接关闭了需要释放资源
codec.Close()
}
(3)执行远程方法并返回响应
func (s *service) call(server *Server, sending *sync.Mutex, mtype *methodType,req *Request, argv, replyv reflect.Value, codec ServerCodec){
function := mtype.method.Func
//这里是真正调用rpc方法的地方
returnValues := function.Call([]reflect.Value{s.rcvr, argv, replyv})
errlnter := returnValues[0].Interface()
errmsg :=""
//处理返回请求了
server.sendResponse(sending, req, replyv.Interface(), codec,errmsg)
server.freeRequest(req)
}
客户端发送RPC请求原理
(1)同步调用和异步调用
func (client *Client)Call(serviceMethod string, args interfacel}, reply interfacel) error{
call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
return call.Error
}
//异步调用实现
func (client*Client) Go(serviceMethod string, argsinterface{l}, reply interface{l}, done chan *Call) *Call {
//初始化Call
call := new(Call)
call.ServiceMethod = serviceMethodcall.Args = args
call.Reply =replyif done == nil {
done = make(chan *Call,10)// buffered.
}else {
if cap(done)==0{
log.Panic("rpc: done channel is unbuffered")}
}
call.Done = done
//调用Client的send方法client.send(call)
return call
}
type Call struct {
ServiceMethod string //服务名及方法名格式:服务.方法
Args interface //函数的请求参数(*struct).
Reply interfacel //函数的响应参数(*struct).
Error error //方法完成后error的状态.
Done chan *Call //方法调用结束后的channel.}
}
(2)请求参数编码
func (client*Client) send(call *Call){
//....
//生成seq,每次调用均生成唯一的seq,在服务端返回结果后会通过该值进行匹配
seq :=client.seq
client.seq++
client.pending[seq] = call
client.mutex.Unlock()
//请求并发送请求
client.request.Seq = seq
client.request.ServiceMethod = call.ServiceMethod
err := client.codec.WriteRequest(&client.request, call.Args)
if err != nil {
//发送请求错误时,将map中call对象删除.client.mutex.Lock()
call= client.pending[seq]
delete(client.pending, seq)
client.mutex.Unlock()
}
}
(3)接收返回值
func (client *Client) input(){
var err error
var response Responsefor err == nil {
response = Response{}
//通过response中的 Seq获取call对象seq := response.Seq
client.mutex.Lock()
call :=client.pending[seq]
delete(client.pending, seq)
client.mutex.Unlock()
switch {
case call == nil:
case response.Error != "":
//上述两个case,一个处理call为nil,另外处理服务端返回的错误,直接将错误返回
default:
//通过编码器,将Resonse的body部分解码成reply对象.
err = client.codec.ReadResponseBody(call.Reply)
if err != nil {
call.Error = errors.New("reading body " + err.Error())
}
call.done()}
}
}
}
func (call *Call) done(){
select {
case call.Done <- call:
// ok
default:
if debugLog {
log.Println("rpc: discarding Call reply due to insufficient Done chan capacity")
}
}
}
小结
Go语言原生RPC代码精简、可扩展性高但是只实现了RPC最基本的网络通信
GitHub有一个基于RPC的功能增强版本一—rpcx
支持了大部分主流RPC的特性
本课时我们从源码角度分析了Go语言原生RPC框架
希望能给你带来对 RPC框架的整体认知
以上是关于[go微服务-16] Go RPC 实现服务间通信的主要内容,如果未能解决你的问题,请参考以下文章