手写RPC框架 第六天 负载均衡

Posted Harris-H

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了手写RPC框架 第六天 负载均衡相关的知识,希望对你有一定的参考价值。

手写RPC框架 第六天 负载均衡

1.负载均衡策略

假设有多个服务实例,每个实例提供相同的功能,为了提高整个系统的吞吐量,每个实例部署在不同的机器上。客户端可以选择任意一个实例进行调用,获取想要的结果。那如何选择呢?取决了负载均衡的策略。对于 RPC 框架来说,我们可以很容易地想到这么几种策略:

  • 随机选择策略 - 从服务列表中随机选择一个。
  • 轮询算法(Round Robin) - 依次调度不同的服务器,每次调度执行 i = (i + 1) mode n。
  • 加权轮询(Weight Round Robin) - 在轮询算法的基础上,为每个服务实例设置一个权重,高性能的机器赋予更高的权重,也可以根据服务实例的当前的负载情况做动态的调整,例如考虑最近5分钟部署服务器的 CPU、内存消耗情况。
  • 哈希/一致性哈希策略 - 依据请求的某些特征,计算一个 hash 值,根据 hash 值将请求发送到对应的机器。一致性 hash 还可以解决服务实例动态添加情况下,调度抖动的问题。一致性哈希的一个典型应用场景是分布式缓存服务。感兴趣可以阅读动手写分布式缓存 - GeeCache第四天 一致性哈希(hash)

2.服务发现

负载均衡的前提是有多个服务实例,那我们首先实现一个最基础的服务发现模块 Discovery。为了与通信部分解耦,这部分的代码统一放置在 xclient 子目录下。

定义 2 个类型:

  • SelectMode 代表不同的负载均衡策略,简单起见,GeeRPC 仅实现 Random 和 RoundRobin 两种策略。
  • Discovery 是一个接口类型,包含了服务发现所需要的最基本的接口。
    • Refresh() 从注册中心更新服务列表
    • Update(servers []string) 手动更新服务列表
    • Get(mode SelectMode) 根据负载均衡策略,选择一个服务实例
    • GetAll() 返回所有的服务实例
package xclient

import (
	"errors"
	"math"
	"math/rand"
	"sync"
	"time"
)

type SelectMode int

const (
	RandomSelect     SelectMode = iota // select randomly
	RoundRobinSelect                   // select using Robbin algorithm
)

type Discovery interface 
	Refresh() error // refresh from remote registry
	Update(servers []string) error
	Get(mode SelectMode) (string, error)
	GetAll() ([]string, error)

紧接着,我们实现一个不需要注册中心,服务列表由手工维护的服务发现的结构体:MultiServersDiscovery

// MultiServersDiscovery is a discovery for multi servers without a registry center
// user provides the server addresses explicitly instead
type MultiServersDiscovery struct 
	r       *rand.Rand   // generate random number
	mu      sync.RWMutex // protect following
	servers []string
	index   int // record the selected position for robin algorithm


// NewMultiServerDiscovery creates a MultiServersDiscovery instance
func NewMultiServerDiscovery(servers []string) *MultiServersDiscovery 
	d := &MultiServersDiscovery
		servers: servers,
		r:       rand.New(rand.NewSource(time.Now().UnixNano())),
	
	d.index = d.r.Intn(math.MaxInt32 - 1)
	return d

  • r 是一个产生随机数的实例,初始化时使用时间戳设定随机数种子,避免每次产生相同的随机数序列。
  • index 记录 Round Robin 算法已经轮询到的位置,为了避免每次从 0 开始,初始化时随机设定一个值。

然后,实现 Discovery 接口

var _ Discovery = (*MultiServersDiscovery)(nil)

// Refresh doesn't make sense for MultiServersDiscovery, so ignore it
func (d *MultiServersDiscovery) Refresh() error 
	return nil


// Update the servers of discovery dynamically if needed
func (d *MultiServersDiscovery) Update(servers []string) error 
	d.mu.Lock()
	defer d.mu.Unlock()
	d.servers = servers
	return nil


// Get a server according to mode
func (d *MultiServersDiscovery) Get(mode SelectMode) (string, error) 
	d.mu.Lock()
	defer d.mu.Unlock()
	n := len(d.servers)
	if n == 0 
		return "", errors.New("rpc discovery: no available servers")
	
	switch mode 
	case RandomSelect:
		return d.servers[d.r.Intn(n)], nil
	case RoundRobinSelect:
		s := d.servers[d.index%n] // servers could be updated, so mode n to ensure safety
		d.index = (d.index + 1) % n
		return s, nil
	default:
		return "", errors.New("rpc discovery: not supported select mode")
	


// returns all servers in discovery
func (d *MultiServersDiscovery) GetAll() ([]string, error) 
	d.mu.RLock()
	defer d.mu.RUnlock()
	// return a copy of d.servers
	servers := make([]string, len(d.servers), len(d.servers))
	copy(servers, d.servers)
	return servers, nil

就是实现了基于不同策略选择服务器的一个服务发现struct。


3.支持负载均衡的客户端

接下来,我们向用户暴露一个支持负载均衡的客户端 XClient。

day6-load-balance/xclient/xclient.go

package xclient

import (
	"context"
	. "geerpc"
	"io"
	"reflect"
	"sync"
)

type XClient struct 
	d       Discovery
	mode    SelectMode
	opt     *Option
	mu      sync.Mutex // protect following
	clients map[string]*Client


var _ io.Closer = (*XClient)(nil)

func NewXClient(d Discovery, mode SelectMode, opt *Option) *XClient 
	return &XClientd: d, mode: mode, opt: opt, clients: make(map[string]*Client)


func (xc *XClient) Close() error 
	xc.mu.Lock()
	defer xc.mu.Unlock()
	for key, client := range xc.clients 
		// I have no idea how to deal with error, just ignore it.
		_ = client.Close()
		delete(xc.clients, key)
	
	return nil

XClient 的构造函数需要传入三个参数,服务发现实例 Discovery、负载均衡模式 SelectMode 以及协议选项 Option。为了尽量地复用已经创建好的 Socket 连接,使用 clients 保存创建成功的 Client 实例,并提供 Close 方法在结束后,关闭已经建立的连接。

接下来,实现客户端最基本的功能 Call

func (xc *XClient) dial(rpcAddr string) (*Client, error) 
	xc.mu.Lock()
	defer xc.mu.Unlock()
	client, ok := xc.clients[rpcAddr]
	if ok && !client.IsAvailable() 
		_ = client.Close()
		delete(xc.clients, rpcAddr)
		client = nil
	
	if client == nil 
		var err error
		client, err = XDial(rpcAddr, xc.opt)
		if err != nil 
			return nil, err
		
		xc.clients[rpcAddr] = client
	
	return client, nil


func (xc *XClient) call(rpcAddr string, ctx context.Context, serviceMethod string, args, reply interface) error 
	client, err := xc.dial(rpcAddr)
	if err != nil 
		return err
	
	return client.Call(ctx, serviceMethod, args, reply)


// Call invokes the named function, waits for it to complete,
// and returns its error status.
// xc will choose a proper server.
func (xc *XClient) Call(ctx context.Context, serviceMethod string, args, reply interface) error 
	rpcAddr, err := xc.d.Get(xc.mode)
	if err != nil 
		return err
	
	return xc.call(rpcAddr, ctx, serviceMethod, args, reply)

我们将复用 Client 的能力封装在方法 dial 中,dial 的处理逻辑如下:

  1. 检查 xc.clients 是否有缓存的 Client,如果有,检查是否是可用状态,如果是则返回缓存的 Client,如果不可用,则从缓存中删除。
  2. 如果步骤 1) 没有返回缓存的 Client,则说明需要创建新的 Client,缓存并返回。

另外,我们为 XClient 添加一个常用功能:Broadcast

// Broadcast invokes the named function for every server registered in discovery
func (xc *XClient) Broadcast(ctx context.Context, serviceMethod string, args, reply interface) error 
	servers, err := xc.d.GetAll()
	if err != nil 
		return err
	
	var wg sync.WaitGroup
	var mu sync.Mutex // protect e and replyDone
	var e error
	replyDone := reply == nil // if reply is nil, don't need to set value
	ctx, cancel := context.WithCancel(ctx)
	for _, rpcAddr := range servers 
		wg.Add(1)
		go func(rpcAddr string) 
			defer wg.Done()
			var clonedReply interface
			if reply != nil 
				clonedReply = reflect.New(reflect.ValueOf(reply).Elem().Type()).Interface()
			
			err := xc.call(rpcAddr, ctx, serviceMethod, args, clonedReply)
			mu.Lock()
			if err != nil && e == nil 
				e = err
				cancel() // if any call failed, cancel unfinished calls
			
			if err == nil && !replyDone 
				reflect.ValueOf(reply).Elem().Set(reflect.ValueOf(clonedReply).Elem())
				replyDone = true
			
			mu.Unlock()
		(rpcAddr)
	
	wg.Wait()
	return e

Broadcast 将请求广播到所有的服务实例,如果任意一个实例发生错误,则返回其中一个错误;如果调用成功,则返回其中一个的结果。有以下几点需要注意:

  1. 为了提升性能,请求是并发的。
  2. 并发情况下需要使用互斥锁保证 error 和 reply 能被正确赋值。
  3. 借助 context.WithCancel 确保有错误发生时,快速失败。

实现了通过广播的方式,高并发请求。

以上是关于手写RPC框架 第六天 负载均衡的主要内容,如果未能解决你的问题,请参考以下文章

手写dubbo-6rpc调用引入注册中心实现服务动态扩容

手写一个RPC框架,理解更透彻(附源码)

手写RPC框架,理解更透彻,代码已上传Github!

项目一:第六天 WebService写接口 和CXF框架

七天入门Go语言 网络编程 | 第六天 炉火纯青

RPC框架的负载均衡机制解析