手写RPC Day2 高性能客户端

Posted Harris-H

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了手写RPC Day2 高性能客户端相关的知识,希望对你有一定的参考价值。

手写RPC Day2 高性能客户端

传送门

1.Call 的设计

net/rpc 而言,一个函数需要能够被远程调用,需要满足如下五个条件:

  • the method’s type is exported.
  • the method is exported.
  • the method has two arguments, both exported (or builtin) types.
  • the method’s second argument is a pointer.
  • the method has return type error.

更直观一些:

func (t *T) MethodName(argType T1, replyType *T2) error

根据上述要求,首先我们封装了结构体 Call 来承载一次 RPC 调用所需要的信息。

// Call represents an active RPC.
type Call struct 
	Seq           uint64
	ServiceMethod string      // format "<service>.<method>"
	Args          interface // arguments to the function
	Reply         interface // reply from the function
	Error         error       // if error occurs, it will be set
	Done          chan *Call  // Strobes when call is complete.


func (call *Call) done() 
	call.Done <- call

2.实现客户端

接下来,我们将实现 GeeRPC 客户端最核心的部分 Client。

// Client represents an RPC Client.
// There may be multiple outstanding Calls associated
// with a single Client, and a Client may be used by
// multiple goroutines simultaneously.
type Client struct 
	cc       codec.Codec
	opt      *Option
	sending  sync.Mutex // protect following
	header   codec.Header
	mu       sync.Mutex // protect following
	seq      uint64
	pending  map[uint64]*Call
	closing  bool // user has called Close
	shutdown bool // server has told us to stop


var _ io.Closer = (*Client)(nil)

var ErrShutdown = errors.New("connection is shut down")

// Close the connection
func (client *Client) Close() error 
	client.mu.Lock()
	defer client.mu.Unlock()
	if client.closing 
		return ErrShutdown
	
	client.closing = true
	return client.cc.Close()


// IsAvailable return true if the client does work
func (client *Client) IsAvailable() bool 
	client.mu.Lock()
	defer client.mu.Unlock()
	return !client.shutdown && !client.closing

  • cc 是消息的编解码器,和服务端类似,用来序列化将要发送出去的请求,以及反序列化接收到的响应。
  • sending 是一个互斥锁,和服务端类似,为了保证请求的有序发送,即防止出现多个请求报文混淆。
  • header 是每个请求的消息头,header 只有在请求发送时才需要,而请求发送是互斥的,因此每个客户端只需要一个,声明在 Client 结构体中可以复用。
  • seq 用于给发送的请求编号,每个请求拥有唯一编号。这里本质是一个 记数器,为了方便给每个call的seq编号。
  • pending 存储未处理完的请求,键是编号,值是 Call 实例。
  • closing 和 shutdown 任意一个值置为 true,则表示 Client 处于不可用的状态,但有些许的差别,closing 是用户主动关闭的,即调用 Close 方法,而 shutdown 置为 true 一般是有错误发生。

在 main 函数中使用了 client.Call 并发了 5 个 RPC 同步调用,参数和返回值的类型均为 string。

func main() 
    log.SetFlags(0)
	addr := make(chan string)
	go startServer(addr)
	client, _ := geerpc.Dial("tcp", <-addr)
	defer func()  _ = client.Close() ()

	time.Sleep(time.Second)
	// send request & receive response
	var wg sync.WaitGroup
	for i := 0; i < 5; i++ 
		wg.Add(1)
		go func(i int) 
			defer wg.Done()
			args := fmt.Sprintf("geerpc req %d", i)
			var reply string
			if err := client.Call("Foo.Sum", args, &reply); err != nil 
				log.Fatal("call Foo.Sum error:", err)
			
			log.Println("reply:", reply)
		(i)
	
	wg.Wait()

3.流程

client, _ := geerpc.Dial("tcp", <-addr) 创建client。

调用client.Call发送请求,func (client *Client) receive()处理请求,与Day1类似。

那么首先实现接收功能,接收到的响应有三种情况:

  • call 不存在,可能是请求没有发送完整,或者因为其他原因被取消,但是服务端仍旧处理了。
  • call 存在,但服务端处理出错,即 h.Error 不为空。
  • call 存在,服务端处理正常,那么需要从 body 中读取 Reply 的值。
func (client *Client) receive() 
	var err error
	for err == nil 
		var h codec.Header
		if err = client.cc.ReadHeader(&h); err != nil 
			break
		
		call := client.removeCall(h.Seq)
		switch 
		case call == nil:
			// it usually means that Write partially failed
			// and call was already removed.
			err = client.cc.ReadBody(nil)
		case h.Error != "":
			call.Error = fmt.Errorf(h.Error)
			err = client.cc.ReadBody(nil)
			call.done()
		default:
			err = client.cc.ReadBody(call.Reply)
			if err != nil 
				call.Error = errors.New("reading body " + err.Error())
			
			call.done()
		
	
	// error occurs, so terminateCalls pending calls
	client.terminateCalls(err)

请求处理完后,然后响应报文给client。


这里支持并发的原因是因为client拥有请求的一个队列map[uint64]*Call,一个chan *Call通道,可以支持发送多个请求,保存多个响应。

响应采用阻塞同步的形式。每次server这边响应后,然后client使用receive()函数接受对应call的reply,然后发送call.Done()表示成功接收响应。

与Day1不同的是,Day2将client封装起来了。

输出结果一样。

以上是关于手写RPC Day2 高性能客户端的主要内容,如果未能解决你的问题,请参考以下文章

手写RPC框架,理解更透彻,代码已上传Github!

java 从零开始手写 RPC (07)-timeout 超时处理

java 从零开始手写 RPC (04) -序列化

手写RPC,深入底层理解整个RPC通信

java 从零开始手写 RPC (05) reflect 反射实现通用调用之服务端

10个类手写实现 RPC 通信框架原理