Go分布式缓存 分布式节点(day5)

Posted Harris-H

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Go分布式缓存 分布式节点(day5)相关的知识,希望对你有一定的参考价值。

Go分布式缓存 分布式节点(day5)

我们在GeeCache 第二天 中描述了 geecache 的流程。在这之前已经实现了流程 ⑴ 和 ⑶,今天实现流程 ⑵,从远程节点获取缓存值。

我们进一步细化流程 ⑵:

使用一致性哈希选择节点        是                                    是
    |-----> 是否是远程节点 -----> HTTP 客户端访问远程节点 --> 成功?-----> 服务端返回返回值
                    |  否                                    ↓  否
                    |----------------------------> 回退到本地节点处理。

2 抽象 PeerPicker

day5-multi-nodes/geecache/peers.go - github

package geecache

// PeerPicker is the interface that must be implemented to locate
// the peer that owns a specific key.
type PeerPicker interface 
	PickPeer(key string) (peer PeerGetter, ok bool)


// PeerGetter is the interface that must be implemented by a peer.
type PeerGetter interface 
	Get(group string, key string) ([]byte, error)

  • 在这里,抽象出 2 个接口,PeerPicker 的 PickPeer() 方法用于根据传入的 key 选择相应节点 PeerGetter。
  • 接口 PeerGetter 的 Get() 方法用于从对应 group 查找缓存值。PeerGetter 就对应于上述流程中的 HTTP 客户端。

3 节点选择与 HTTP 客户端

GeeCache 第三天 中我们为 HTTPPool 实现了服务端功能,通信不仅需要服务端还需要客户端,因此,我们接下来要为 HTTPPool 实现客户端的功能。

首先创建具体的 HTTP 客户端类 httpGetter,实现 PeerGetter 接口。

day5-multi-nodes/geecache/http.go - github

type httpGetter struct 
	baseURL string


func (h *httpGetter) Get(group string, key string) ([]byte, error) 
	u := fmt.Sprintf(
		"%v%v/%v",
		h.baseURL,
		url.QueryEscape(group),
		url.QueryEscape(key),
	)
	res, err := http.Get(u)
	if err != nil 
		return nil, err
	
	defer res.Body.Close()

	if res.StatusCode != http.StatusOK 
		return nil, fmt.Errorf("server returned: %v", res.Status)
	

	bytes, err := ioutil.ReadAll(res.Body)
	if err != nil 
		return nil, fmt.Errorf("reading response body: %v", err)
	

	return bytes, nil


var _ PeerGetter = (*httpGetter)(nil)
  • baseURL 表示将要访问的远程节点的地址,例如 http://example.com/_geecache/
  • 使用 http.Get() 方式获取返回值,并转换为 []bytes 类型。

第二步,为 HTTPPool 添加节点选择的功能。

const (
	defaultBasePath = "/_geecache/"
	defaultReplicas = 50
)
// HTTPPool implements PeerPicker for a pool of HTTP peers.
type HTTPPool struct 
	// this peer's base URL, e.g. "https://example.net:8000"
	self        string
	basePath    string
	mu          sync.Mutex // guards peers and httpGetters
	peers       *consistenthash.Map
	httpGetters map[string]*httpGetter // keyed by e.g. "http://10.0.0.2:8008"

  • 新增成员变量 peers,类型是一致性哈希算法的 Map,用来根据具体的 key 选择节点。
  • 新增成员变量 httpGetters,映射远程节点与对应的 httpGetter。每一个远程节点对应一个 httpGetter,因为 httpGetter 与远程节点的地址 baseURL 有关。

第三步,实现 PeerPicker 接口。

// Set updates the pool's list of peers.
func (p *HTTPPool) Set(peers ...string) 
	p.mu.Lock()
	defer p.mu.Unlock()
	p.peers = consistenthash.New(defaultReplicas, nil)
	p.peers.Add(peers...)
	p.httpGetters = make(map[string]*httpGetter, len(peers))
	for _, peer := range peers 
		p.httpGetters[peer] = &httpGetterbaseURL: peer + p.basePath
	


// PickPeer picks a peer according to key
func (p *HTTPPool) PickPeer(key string) (PeerGetter, bool) 
	p.mu.Lock()
	defer p.mu.Unlock()
	if peer := p.peers.Get(key); peer != "" && peer != p.self 
		p.Log("Pick peer %s", peer)
		return p.httpGetters[peer], true
	
	return nil, false


var _ PeerPicker = (*HTTPPool)(nil)
  • Set() 方法实例化了一致性哈希算法,并且添加了传入的节点。
  • 并为每一个节点创建了一个 HTTP 客户端 httpGetter
  • PickerPeer() 包装了一致性哈希算法的 Get() 方法,根据具体的 key,选择节点,返回节点对应的 HTTP 客户端。

至此,HTTPPool 既具备了提供 HTTP 服务的能力,也具备了根据具体的 key,创建 HTTP 客户端从远程节点获取缓存值的能力。

4 实现主流程

最后,我们需要将上述新增的功能集成在主流程(geecache.go)中。

day5-multi-nodes/geecache/geecache.go - github

// A Group is a cache namespace and associated data loaded spread over
type Group struct 
	name      string
	getter    Getter
	mainCache cache
	peers     PeerPicker


// RegisterPeers registers a PeerPicker for choosing remote peer
func (g *Group) RegisterPeers(peers PeerPicker) 
	if g.peers != nil 
		panic("RegisterPeerPicker called more than once")
	
	g.peers = peers


func (g *Group) load(key string) (value ByteView, err error) 
	if g.peers != nil 
		if peer, ok := g.peers.PickPeer(key); ok 
			if value, err = g.getFromPeer(peer, key); err == nil 
				return value, nil
			
			log.Println("[GeeCache] Failed to get from peer", err)
		
	

	return g.getLocally(key)


func (g *Group) getFromPeer(peer PeerGetter, key string) (ByteView, error) 
	bytes, err := peer.Get(g.name, key)
	if err != nil 
		return ByteView, err
	
	return ByteViewb: bytes, nil

  • 新增 RegisterPeers() 方法,将 实现了 PeerPicker 接口的 HTTPPool 注入到 Group 中。
  • 新增 getFromPeer() 方法,使用实现了 PeerGetter 接口的 httpGetter 从访问远程节点,获取缓存值。
  • 修改 load 方法,使用 PickPeer() 方法选择节点,若非本机节点,则调用 getFromPeer() 从远程获取。若是本机节点或失败,则回退到 getLocally()

5 总结

创建多个结点,在测试中创建了三个节点对应三个端口的服务。

启用了API服务器用于交互。

利用一致性哈希选择节点获取value。

以上是关于Go分布式缓存 分布式节点(day5)的主要内容,如果未能解决你的问题,请参考以下文章

分布式缓存服务(Redis)7天实战营~Day5实践作业完成步骤

Go分布式缓存 一致性哈希(hash)(day4)

Go分布式缓存 一致性哈希(hash)(day4)

Go分布式缓存 使用 Protobuf 通信(day7)

Go分布式缓存 使用 Protobuf 通信(day7)

通过 Go 搞懂一致性hash的原理和实现