「推荐系统从0到1」服务发现

Posted 算法工厂

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了「推荐系统从0到1」服务发现相关的知识,希望对你有一定的参考价值。

奥利给!

前沿

首先谈谈我对推荐系统的引擎和算法的理解。

现在市面上讲起推荐系统,大多都是讲各种算法,讲的天花乱坠,高深莫测,其实很多算法都是大同小异,核心思想是差不多的,只不过实现手段略有差异。而在工业上,各种复杂算法能够落地的,我认为不多,大部分的厂商,运用的算法都是很集中的那一部分算法。

一套好的推荐系统,对于引擎是非常依赖的,实验显示,响应时长与各项指标之间都是有直接关联的,响应时长越长,指标越低。

作为一个朴实的推荐码农,我还是想从基础做起,朴朴实实,脚踏实地,先把引擎部分做好。当然,算法后面也会有,毕竟引擎和算法缺一不可。

那么,废话少说,推荐引擎,搞起吧!

服务发现

既然是搞引擎,也就是后端,当然要先把架构先搭建起来。

后端服务,微服务已经成为了当前的主流,具有非常多的优点,比如高内聚,单独部署,各自负载均衡等,当然缺点也有,通信更复杂了等。具体就不在这里展开了,有兴趣的兄弟们可以百度,google一下。

在整个流程中,可以分为服务端的要做的,以及客户端要做的,下面依次来看一下。

服务端比较简单,只需要将自己的信息存储到某个存储中。

是不是原理上非常简单!那么进入实操吧!

etcd介绍

以前有zookeeper,而zookeeper可以看到,早就不再维护更新了。

而etcd,用go语言开发,因kubernetes而闻名,在kubernetes中,使用etcd作为分布式存储获取分布式锁。

所以我们当然用更年轻,更轻量,并且也非常稳定的etcd搞了!就是这么喜新厌旧= =

etcd使用raft算法实现的一致性,至于raft算法,可以看下面这个动画演示,很完美生动。

raft动画演示

etcd实战

我这边用docker来做自己的测试环境,上我的docker-compose.yaml

version: '2.2'
services:
  etcd:
    image: gcr.io/etcd-development/etcd:v3.4.13
    container_name: etcd
    restart: always
    ports:
      - 2379:2379
      - 2380:2380
    command:
      - "/usr/local/bin/etcd"
      - "--name"
      - "s1"
      - "--data-dir"
      - "/etcd-data"
      - "--advertise-client-urls"
      - "http://0.0.0.0:2379"
      - "--listen-client-urls"
      - "http://0.0.0.0:2379"
      - "--initial-advertise-peer-urls"
      - "http://0.0.0.0:2380"
      - "--listen-peer-urls"
      - "http://0.0.0.0:2380"
      - "--initial-cluster-token"
      - "tkn"
      - "--initial-cluster"
      - "s1=http://0.0.0.0:2380"
      - "--initial-cluster-state"
      - "new"

如果想通过其他途径安装可以看官方的说明:

安装etcd

那么,既然是存储,我们就来测试一下CRUD吧,还有etcd的租约功能。

CRUD:

# etcdctl put test/key hello
OK
# etcdctl get test/key
test/key
hello
# etcdctl put test/key goodbye
OK
# etcdctl get test/key
test/key
goodbye
# etcdctl del test/key
1
# etcdctl get test/key

租约:

创建租约,120s过期

# etcdctl lease grant 120
lease 3f3575c45fa5ff26 granted with TTL(120s)

查看租约列表

# etcdctl lease list
found 1 leases
3f3575c45fa5ff26

新建kv,并绑定租约

# etcdctl put test/key hello --lease="3f3575c45fa5ff26"
OK

查看租约下的key剩余时间

# etcdctl lease timetolive 3f3575c45fa5ff26 --keys
lease 3f3575c45fa5ff26 granted with TTL(120s), remaining(46s), attached keys([test/key])

查看还存在的key

# etcdctl get --prefix ""
test/key
hello

等租约过期后,查看key,key已被自动删除

# etcdctl lease timetolive 3f3575c45fa5ff26 --keys
lease 3f3575c45fa5ff26 already expired
# etcdctl get --prefix ""

租约续约:

同样创建租约,绑定kv

# etcdctl lease grant 30
lease 3f3575c45fa5ff2c granted with TTL(30s)
# etcdctl put test/key hello --lease="3f3575c45fa5ff2c"
OK

续约

# etcdctl lease keep-alive 3f3575c45fa5ff2c
lease 3f3575c45fa5ff2c keepalived with TTL(30)
lease 3f3575c45fa5ff2c keepalived with TTL(30)
lease 3f3575c45fa5ff2c keepalived with TTL(30)
lease 3f3575c45fa5ff2c keepalived with TTL(30)

打开个新窗口查看租约与key

# etcdctl lease timetolive 3f3575c45fa5ff2c --keys
lease 3f3575c45fa5ff2c granted with TTL(30s), remaining(23s), attached keys([test/key])
# etcdctl get --prefix ""
test/key
hello

发现并没有过期。

golang+grpc+etcd 服务发现终极实战!

先上github仓库:https://github.com/feng-jin/recommender-system

代码目录/go_server/src/lib/discovery/

说一下整个流程:

服务端向etcd注册服务,就是将本服务的信息写进etcd。

客户端大体流程:

  1. 从etcd取服务端地址列表,并watch列表变化,并更新。
  2. 把地址列表写进grpc resolver的resolver.ClientConn的地址列表中。
  3. grpc创建连接,根据负载均衡请求。

整个模块分为7个文件:

  • config.go,配置文件。
  • discovery.go,用于初始化。
  • register.go,用于服务注册。
  • resolver.go,用于解析etcd里注册的服务地址,以及grpc负载均衡。
  • util.go,公共方法。
  • wrapper.go,对外部提供的调用封装。
  • ctx.go,context,设置超时时间。

config.go

package config

import "time"

// etcd
const (
 Timeout        = 15 * time.Second
 Expires        = 10
 TickerInterval = 5
 // scheme
 Scheme = "etcd"
 // etcd中存储key的格式前缀:/scheme/authority/endpoint
 DirFormat = "/%s/%s/%s/"
 // grpc resolver中自定义解析需要提供的格式:scheme://authority/endpoint
 // 其中scheme可以理解为解析策略,authority可以理解为权限管理,endpoint为地址
 TargetFormat = "%s://%s/%s"
)

// server name
const (
 GreetServer = "greet_server"
)

discovery.go

package discovery

import (
 "fmt"
 "go_server/src/lib/discovery/config"
 "go_server/src/lib/logger"
 "strings"

 "go.etcd.io/etcd/clientv3"
)

var (
 client *clientv3.Client
)

// Init 初始化etcd
func Init(etcdAddr string) error {
 var err error
 if client == nil {
  //构建etcd client
  client, err = clientv3.New(clientv3.Config{
   Endpoints:   strings.Split(etcdAddr, ";"),
   DialTimeout: config.Timeout,
  })
  if err != nil {
   logger.Error("连接etcd失败:%s\n", err)
   fmt.Printf("连接etcd失败:%s\n", err)
   return err
  }
 }
 return nil
}

register.go

package discovery

import (
 "context"
 "errors"
 "fmt"
 "go_server/src/lib/discovery/config"
 "os"
 "os/signal"
 "syscall"
 "time"

 "go.etcd.io/etcd/clientv3"
)

//Service 服务端用于服务注册的对象
type Service struct {
 Name string //服务名称
 Host string //{ip}:{port}
 Env  string //所属环境

 Key string //保存在etcd中的key
}

var service *Service

func (s *Service) register() error {
 if s.Env == "" {
  return errors.New("env is null")
 }
 s.Key = fmt.Sprintf(config.DirFormat, config.Scheme, s.Env, s.Name) + s.Host
 ticker := time.NewTicker(time.Second * time.Duration(config.TickerInterval))
 go func() {
  for {
   resp, err := client.Get(context.Background(), s.Key)
   if err != nil {
    fmt.Printf("获取服务地址失败:%s", err)
   } else if resp.Count == 0 { //尚未注册
    err = s.keepAlive()
    if err != nil {
     fmt.Printf("保持连接失败:%s", err)
    }
   }
   <-ticker.C
  }
 }()
 return nil
}

// keepAlive 创建租约,绑定,并续期
func (s *Service) keepAlive() error {
 //创建租约
 leaseResp, err := client.Grant(context.Background(), config.Expires)
 if err != nil {
  fmt.Printf("创建租期失败:%s\n", err)
  return err
 }

 //将服务地址注册到etcd中
 _, err = client.Put(context.Background(), s.Key, s.Host, clientv3.WithLease(leaseResp.ID))
 if err != nil {
  fmt.Printf("注册服务失败:%s", err)
  return err
 }

 //租约续期
 ch, err := client.KeepAlive(context.Background(), leaseResp.ID)
 if err != nil {
  fmt.Printf("租约续期失败:%s\n", err)
  return err
 }

 //清空keepAlive返回的channel
 go func() {
  for {
   <-ch
  }
 }()
 return nil
}

//取消注册
func (s *Service) unRegister() {
 if client != nil {
  _, _ = client.Delete(context.Background(), s.Key)
 }
}

func WaitForClose() {
 ch := make(chan os.Signal, 1)
 signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGQUIT)
 sig := <-ch
 service.unRegister()
 if i, ok := sig.(syscall.Signal); ok {
  os.Exit(int(i))
 } else {
  os.Exit(0)
 }
}

resolver.go

package discovery

import (
 "context"
 "fmt"
 "go_server/src/lib/discovery/config"
 "strings"

 "go.etcd.io/etcd/clientv3"
 "google.golang.org/grpc"
 "google.golang.org/grpc/resolver"
)

//EtcdResolver解析器
type EtcdResolver struct {
 dir        string
 clientConn resolver.ClientConn
}

func Resolver(env string, name string) *grpc.ClientConn {
 //注册etcd解析器
 r := &EtcdResolver{}
 resolver.Register(r)
 target := fmt.Sprintf(config.TargetFormat, r.Scheme(), env, name)
 //客户端连接服务器(负载均衡:轮询) 会同步调用r.Build()
 dailOpts := []grpc.DialOption{
  grpc.WithBalancerName("round_robin"), // grpc内部提供的轮询负载均衡
  grpc.WithInsecure(),
  grpc.WithDefaultCallOptions(
   grpc.MaxCallRecvMsgSize(1024 * 1024 * 16),
  ),
 }
 conn, err := grpc.Dial(target, dailOpts...)
 if err != nil {
  fmt.Println("连接服务器失败:", err)
 }
 return conn
}

func (r *EtcdResolver) Scheme() string {
 return config.Scheme
}

//构建解析器 grpc.Dial()同步调用
func (r *EtcdResolver) Build(target resolver.Target, clientConn resolver.ClientConn, _ resolver.BuildOptions) (resolver.Resolver, error) {
 r.clientConn = clientConn
 r.dir = fmt.Sprintf(config.DirFormat, target.Scheme, target.Authority, target.Endpoint)
 go r.watch()
 return r, nil
}

//监听etcd中某个key前缀的服务地址列表的变化
func (r *EtcdResolver) watch() {
 //初始化服务地址列表
 var addrList []resolver.Address

 resp, err := client.Get(context.Background(), r.dir, clientv3.WithPrefix())
 if err != nil {
  fmt.Println("获取服务地址列表失败:", err)
 } else {
  for i := range resp.Kvs {
   fmt.Println(strings.TrimPrefix(string(resp.Kvs[i].Key), r.dir))
   addrList = append(addrList, resolver.Address{Addr: strings.TrimPrefix(string(resp.Kvs[i].Key), r.dir)})
  }
 }

 r.clientConn.NewAddress(addrList)

 //监听服务地址列表的变化
 rch := client.Watch(context.Background(), r.dir, clientv3.WithPrefix())
 for n := range rch {
  for _, ev := range n.Events {
   addr := strings.TrimPrefix(string(ev.Kv.Key), r.dir)
   switch ev.Type {
   case clientv3.EventTypePut:
    if !exists(addrList, addr) {
     addrList = append(addrList, resolver.Address{Addr: addr})
     r.clientConn.NewAddress(addrList)
    }
   case clientv3.EventTypeDelete:
    if s, ok := remove(addrList, addr); ok {
     addrList = s
     r.clientConn.NewAddress(addrList)
    }
   }
  }
 }
}

func exists(l []resolver.Address, addr string) bool {
 for i := range l {
  if l[i].Addr == addr {
   return true
  }
 }
 return false
}

func remove(s []resolver.Address, addr string) ([]resolver.Address, bool) {
 for i := range s {
  if s[i].Addr == addr {
   s[i] = s[len(s)-1]
   return s[:len(s)-1], true
  }
 }
 return nilfalse
}

//Close ...
func (r *EtcdResolver) Close() {}

//ResolveNow ...
func (r *EtcdResolver) ResolveNow(_ resolver.ResolveNowOption) {}

util.go

package discovery

import (
 "fmt"
 "net"
)

// 获取本机ip地址
func getIntranetIP() (ip string) {
 if addrs, err := net.InterfaceAddrs(); err == nil {
  for _, address := range addrs {
   if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
    if ipnet.IP.To4() != nil {
     ip = ipnet.IP.String()
     break
    }
   }
  }
 }
 return
}

// 自动获取本机的ip以及端口号,ip:port格式
func getListener() (listener net.Listener, host string, err error) {
 host = "0.0.0.0:0"
 listener, err = net.Listen("tcp", host)
 if err == nil {
  addr := listener.Addr().String()
  _, portString, _ := net.SplitHostPort(addr)
  host = fmt.Sprintf("%s:%s", getIntranetIP(), portString)
 }
 return
}

wrapper.go

package discovery

import (
 "fmt"
 "go_server/src/lib/discovery/config"
 "go_server/src/lib/proto/greet"

 "google.golang.org/grpc"
)

func GreetRegister(env string, server greet.GreetServer) error {
 listener, host, err := getListener()
 if err != nil {
  fmt.Println("监听网络失败:", err)
  return err
 }
 fmt.Println("host:", host)
 srv := grpc.NewServer()
 go srv.Serve(listener)
 greet.RegisterGreetServer(srv, server)
 service = &Service{Name: config.GreetServer, Host: host, Env: env}
 err = service.register()
 if err != nil {
  fmt.Println(err)
  return err
 }
 return nil
}

func GreetResolve(env string) greet.GreetClient {
 return greet.NewGreetClient(Resolver(env, config.GreetServer))
}

ctx.go

package discovery

import (
 "context"
 "time"
)

// 1s超时
func Context1s() (ctx context.Context, cancel context.CancelFunc) {
 return context.WithTimeout(context.TODO(), time.Second)
}

测试一下吧,测试文件也都在github仓库里:

搞个测试的proto,server和client,也直接上代码:

greet.proto

syntax = "proto3";


option go_package = "src/lib/proto/greet";

service Greet {
rpc Morning(GreetRequest)returns(GreetResponse){}
rpc Night(GreetRequest)returns(GreetResponse){}
}

message GreetRequest {
string name = 1;
}

message GreetResponse {
string message = 1;
string from = 2;
}

server main.go

package main

import (
 "context"
 "flag"
 "fmt"
 "go_server/src/lib/discovery"
 proto "go_server/src/lib/proto/greet"
)

var (
 Flag     = flag.String("flag""a""flag")
 EtcdAddr = flag.String("EtcdAddr""127.0.0.1:2379""register etcd address")
 Env      = flag.String("Env""test""env")
)

//rpc服务接口
type GreetServer struct{}

func (gs *GreetServer) Morning(ctx context.Context, req *proto.GreetRequest) (*proto.GreetResponse, error) {
 fmt.Printf("Morning 调用: %s\n", req.Name)
 return &proto.GreetResponse{
  Message: "Good morning, " + req.Name,
  From:    *Flag,
 }, nil
}

func (gs *GreetServer) Night(ctx context.Context, req *proto.GreetRequest) (*proto.GreetResponse, error) {
 fmt.Printf("Night 调用: %s\n", req.Name)
 return &proto.GreetResponse{
  Message: "Good night, " + req.Name,
  From:    *Flag,
 }, nil
}

func main() {
 flag.Parse()
 err := discovery.Init(*EtcdAddr)
 if err != nil {
  fmt.Println(err)
  return
 }
 err = discovery.GreetRegister(*Env, &GreetServer{})
 if err != nil {
  fmt.Println(err)
  return
 }
 discovery.WaitForClose()
}

client main.go

package main

import (
 "flag"
 "fmt"
 "go_server/src/lib/discovery"
 proto "go_server/src/lib/proto/greet"
 "time"
)

var (
 EtcdAddr = flag.String("EtcdAddr""127.0.0.1:2379""register etcd address")
 Env      = flag.String("Env""test""env")
)

func main() {
 flag.Parse()
 err := discovery.Init(*EtcdAddr)
 if err != nil {
  fmt.Println(err)
  return
 }
 c := discovery.GreetResolve(*Env)
 ticker := time.NewTicker(1 * time.Second)
 for range ticker.C {
  fmt.Println("Morning 调用...")
  ctx, cancel := discovery.Context1s()
  resp1, err := c.Morning(
   ctx,
   &proto.GreetRequest{Name: "Jinfeng"},
  )
  cancel()
  if err != nil {
   fmt.Println("Morning调用失败:", err)
   return
  }
  fmt.Printf("Morning 响应:%s,来自:%s\n", resp1.Message, resp1.From)

  fmt.Println("Night 调用...")
  ctx, cancel = discovery.Context1s()
  resp2, err := c.Night(
   ctx,
   &proto.GreetRequest{Name: "Jinfeng"},
  )
  cancel()
  if err != nil {
   fmt.Println("Night调用失败:", err)
   return
  }
  fmt.Printf("Night 响应:%s,来自:%s\n", resp2.Message, resp2.From)
 }
}

跑起来吧,起3个server,可以看到,在etcd已经注册了3台服务。

# etcdctl get --prefix ""
/etcd/test/greet_server/192.168.31.71:52963
192.168.31.71:52963
/etcd/test/greet_server/192.168.31.71:52969
192.168.31.71:52969
/etcd/test/greet_server/192.168.31.71:52973
192.168.31.71:52973

client调用

➜  client git:(main) ✗ go run .
192.168.31.71:52963
192.168.31.71:52969
192.168.31.71:52973
Morning 调用...
Morning 响应:Good morning, Jinfeng,来自:c
Night 调用...
Night 响应:Good night, Jinfeng,来自:a
Morning 调用...
Morning 响应:Good morning, Jinfeng,来自:b
Night 调用...
Night 响应:Good night, Jinfeng,来自:c
Morning 调用...
Morning 响应:Good morning, Jinfeng,来自:a
Night 调用...
Night 响应:Good night, Jinfeng,来自:b
Morning 调用...
Morning 响应:Good morning, Jinfeng,来自:c

shutdown一台服务

Morning 响应:Good morning, Jinfeng,来自:a
Night 调用...
Night 响应:Good night, Jinfeng,来自:b
Morning 调用...
Morning 响应:Good morning, Jinfeng,来自:a
Night 调用...
Night 响应:Good night, Jinfeng,来自:b
Morning 调用...
Morning 响应:Good morning, Jinfeng,来自:a
Night 调用...
Night 响应:Good night, Jinfeng,来自:b

重新启动

Morning 响应:Good morning, Jinfeng,来自:a
Night 调用...
Night 响应:Good night, Jinfeng,来自:b
Morning 调用...
Morning 响应:Good morning, Jinfeng,来自:c
Night 调用...
Night 响应:Good night, Jinfeng,来自:a
Morning 调用...
Morning 响应:Good morning, Jinfeng,来自:b
Night 调用...
Night 响应:Good night, Jinfeng,来自:c
Morning 调用...
Morning 响应:Good morning, Jinfeng,来自:a
Night 调用...
Night 响应:Good night, Jinfeng,来自:b

这一轮,只是用grpc内部简单的轮训来做负载均衡,后面有空了,再加入一致性哈希等方法吧!

到现在,服务发现已经有了,下面就可以先做一个简单的推荐系统,把流程跑起来了!

后面计划先做一个只有简单召回的推荐系统,然后再慢慢优化整套系统。

兄弟们,奥利给!


微信公众号:算法工厂
- END -


以上是关于「推荐系统从0到1」服务发现的主要内容,如果未能解决你的问题,请参考以下文章

为啥尽管源代码没有变化,但从一个系统到另一个系统的片段数量却有很大差异?

详解工业级推荐系统从0到1的构建

C#程序员经常用到的10个实用代码片段 - 操作系统

没有算法资源,产品经理如何从0到1搭建推荐系统?

「实践」Yoo视频底层页推荐系统从0到1的实践

好课推荐:百万级电商系统从0到1架构设计资料完整高清完整