nsq介绍和使用

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了nsq介绍和使用相关的知识,希望对你有一定的参考价值。

参考技术A 最近一直在寻找一个高性能,高可用的消息队列做内部服务之间的通讯。一开始想到用zeromq,但在查找资料的过程中,意外的发现了Nsq这个由golang开发的消息队列,毕竟是golang原汁原味的东西,功能齐全,关键是性能还不错。其中支持动态拓展,消除单点故障等特性, 都可以很好的满足我的需求

下面上一张Nsq与其他mq的对比图,看上去的确强大。下面简单记录一下Nsq的使用方法

图片来自golang2017开发者大会

在使用Nsq服务之前,还是有必要了解一下Nsq的几个核心组件
整个Nsq服务包含三个主要部分

先看看官方的原话是怎么说:
nsqlookupd是守护进程负责管理拓扑信息。客户端通过查询 nsqlookupd 来发现指定话题(topic)的生产者,并且 nsqd 节点广播话题(topic)和通道(channel)信息

简单的说nsqlookupd就是中心管理服务,它使用tcp(默认端口4160)管理nsqd服务,使用http(默认端口4161)管理nsqadmin服务。同时为客户端提供查询功能

总的来说,nsqlookupd具有以下功能或特性

官方原话:是一套 WEB UI,用来汇集集群的实时统计,并执行不同的管理任务

总的来说,nsqadmin具有以下功能或特性

nsqadmin默认的访问地址是 http://127.0.0.1:4171/

官方原话:nsqd 是一个守护进程,负责接收,排队,投递消息给客户端

简单的说,真正干活的就是这个服务,它主要负责message的收发,队列的维护。nsqd会默认监听一个tcp端口(4150)和一个http端口(4151)以及一个可选的https端口

总的来说,nsqd 具有以下功能或特性

这是官方的图,第一个channel(meteics)因为有多个消费者,所以触发了负载均衡机制。后面两个channel由于没有消费者,所有的message均会被缓存在相应的队列里,直到消费者出现

这里想到一个问题是,如果一个channel只有生产者不停的在投递message,会不会导致服务器资源被耗尽?也许nsqd内部做了相应处理,但还是要避免这种情况的出现

了解nsqlookupd,nsqd与客户端中消费者和生产者的关系

消费者有两种方式与nsqd建立连接

生产者必须直连nsqd去投递message(网上说,可以连接到nsqlookupd,让nsqlookupd自动选择一个nsqd去完成投递,但是我用Producer的tcp是连不上nsqlookupd的,不知道http可不可以…),

这里有一个问题就是如果生产者所连接的nsqd炸了,那么message就会投递失败,所以在客户端必须自己实现相应的备用方案

执行完后检查godep是否已经安装在bin目录下,一般都会自动安装,如果没有,用go install手动安装下

如果安装成功,bin目录里就会出现一大堆nsq_…开头的可执行文件

nsqd是一个独立的服务,启动一个nsqd就可以完成message的收发,启动一个单机的nsqd,很简单

客户端可以使用http,也可以使用tcp,这里我使用是官方的go-nsq包做客户端,使用tcp进行message的收发

//Nsq发送测试

//Nsq接收测试

nsqlookupd:高性能消息中间件 NSQ 解析

摘要:本篇将会结合源码介绍 nsqlookupd 的实现细节。

本文分享自华为云社区《高性能消息中间件 NSQ 解析-nsqlookupd 实现细节介绍》,原文作者:aoho 。

本篇将会结合源码介绍 nsqlookupd 的实现细节。nsqlookupd 主要流程与nsqd 执行逻辑相似,区别在于具体运行的任务不同。

nsqlookupd是nsq管理集群拓扑信息以及用于注册和发现nsqd服务。所以,也可以把nsqlookupd理解为注册发现服务。当nsq集群中有多个nsqlookupd服务时,因为每个nsqd都会向所有的nsqlookupd上报本地信息,因此nsqlookupd具有最终一致性。

入口函数

在 nsq/apps/nsqlookupd/main.go 可以找到执行入口文件。

// 位于apps/nsqlookupd/main.go:45
func main() {
  prg := &program{}
  if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {
    logFatal("%s", err)
  }
}

func (p *program) Init(env svc.Environment) error {
  if env.IsWindowsService() {
    dir := filepath.Dir(os.Args[0])
    return os.Chdir(dir)
  }
  return nil
}

func (p *program) Start() error {
  opts := nsqlookupd.NewOptions()

  flagSet := nsqlookupdFlagSet(opts)
  ...
}

同样,通过第三方 svc 包进行优雅的后台进程管理,svc.Run() -> svc.Init() -> svc.Start(),启动 nsqlookupd 实例。

// 位于 apps/nsqlookupd/main.go:80
options.Resolve(opts, flagSet, cfg)
  nsqlookupd, err := nsqlookupd.New(opts)
  if err != nil {
    logFatal("failed to instantiate nsqlookupd", err)
  }
  p.nsqlookupd = nsqlookupd

  go func() {
    err := p.nsqlookupd.Main()
    if err != nil {
      p.Stop()
      os.Exit(1)
    }
  }()

初始化配置参数(优先级:flagSet-命令行参数 > cfg-配置文件 > opts-默认值),开启协程,进入 nsqlookupd.Main() 主函数。

监听请求

我们来看下 nsqlookupd 是如何监听请求的,代码实现如下:

// 位于 nsqlookupd/nsqlookupd.go:53
func (l *NSQLookupd) Main() error {
  ctx := &Context{l}

  exitCh := make(chan error)
  var once sync.Once
  exitFunc := func(err error) {
    once.Do(func() {
      if err != nil {
        l.logf(LOG_FATAL, "%s", err)
      }
      exitCh <- err
    })
  }

  tcpServer := &tcpServer{ctx: ctx}
  l.waitGroup.Wrap(func() {
    exitFunc(protocol.TCPServer(l.tcpListener, tcpServer, l.logf))
  })
  httpServer := newHTTPServer(ctx)
  l.waitGroup.Wrap(func() {
    exitFunc(http_api.Serve(l.httpListener, httpServer, "HTTP", l.logf))
  })

  err := <-exitCh
  return err
}

开启 goroutine 执行 tcpServer, httpServer,分别监听 nsqd, nsqadmin 的客户端请求。

处理请求

// 位于 internal/protocol/tcp_server.go:17
func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) error {
  logf(lg.INFO, "TCP: listening on %s", listener.Addr())

  for {
    clientConn, err := listener.Accept()
    if err != nil {
      if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
        logf(lg.WARN, "temporary Accept() failure - %s", err)
        runtime.Gosched()
        continue
      }
      // theres no direct way to detect this error because it is not exposed
      if !strings.Contains(err.Error(), "use of closed network connection") {
        return fmt.Errorf("listener.Accept() error - %s", err)
      }
      break
    }
    go handler.Handle(clientConn)
  }

  logf(lg.INFO, "TCP: closing %s", listener.Addr())

  return nil
}

TCPServer 循环监听客户端请求,建立长连接进行通信,并开启 handler 处理每一个客户端 conn。

装饰 http 路由

httpServer 通过 http_api.Decorate 装饰器实现对各 http 路由进行 handler 装饰,如加 log 日志、V1 协议版本号的统一格式输出等;

func newHTTPServer(ctx *Context) *httpServer {
  log := http_api.Log(ctx.nsqlookupd.logf)

  router := httprouter.New()
  router.HandleMethodNotAllowed = true
  router.PanicHandler = http_api.LogPanicHandler(ctx.nsqlookupd.logf)
  router.NotFound = http_api.LogNotFoundHandler(ctx.nsqlookupd.logf)
  router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(ctx.nsqlookupd.logf)
  s := &httpServer{
    ctx:    ctx,
    router: router,
  }

  router.Handle("GET", "/ping", http_api.Decorate(s.pingHandler, log, http_api.PlainText))
  router.Handle("GET", "/info", http_api.Decorate(s.doInfo, log, http_api.V1))

  // v1 negotiate
  router.Handle("GET", "/debug", http_api.Decorate(s.doDebug, log, http_api.V1))
  router.Handle("GET", "/lookup", http_api.Decorate(s.doLookup, log, http_api.V1))
  router.Handle("GET", "/topics", http_api.Decorate(s.doTopics, log, http_api.V1))
  router.Handle("GET", "/channels", http_api.Decorate(s.doChannels, log, http_api.V1))
}

处理客户端命令

tcp 解析 V1 协议,内部协议封装的 prot.IOLoop(conn) 进行循环处理客户端命令,直到客户端命令全部解析处理完毕才关闭连接。

var prot protocol.Protocol
  switch protocolMagic {
  case "  V1":
    prot = &LookupProtocolV1{ctx: p.ctx}
  default:
    protocol.SendResponse(clientConn, []byte("E_BAD_PROTOCOL"))
    clientConn.Close()
    p.ctx.nsqlookupd.logf(LOG_ERROR, "client(%s) bad protocol magic \'%s\'",
      clientConn.RemoteAddr(), protocolMagic)
    return
  }

  err = prot.IOLoop(clientConn)

执行命令

通过内部协议进行 p.Exec(执行命令)、p.SendResponse(返回结果),保证每个 nsqd 节点都能正确的进行服务注册(register)与注销(unregister),并进行心跳检测(ping)节点的可用性,确保客户端取到的 nsqd 节点列表都是最新可用的。

for {
    line, err = reader.ReadString(\'n\')
    if err != nil {
      break
    }

    line = strings.TrimSpace(line)
    params := strings.Split(line, " ")

    var response []byte
    response, err = p.Exec(client, reader, params)
    if err != nil {
      ctx := ""
      if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil {
        ctx = " - " + parentErr.Error()
      }
      _, sendErr := protocol.SendResponse(client, []byte(err.Error()))
      if sendErr != nil {
        p.ctx.nsqlookupd.logf(LOG_ERROR, "[%s] - %s%s", client, sendErr, ctx)
        break
      }
      continue
    }

    if response != nil {
      _, err = protocol.SendResponse(client, response)
      if err != nil {
        break
      }
    }
  }

  conn.Close()

nsqlookupd 服务同时开启 tcp 和 http 两个监听服务,nsqd 会作为客户端,连上 nsqlookupd 的 tcp 服务,并上报自己的 topic 和 channel 信息,以及通过心跳机制判断 nsqd 状态;还有个 http 服务提供给 nsqadmin 获取集群信息。

小结

本文主要介绍 nsqlookupd 的实现,nsqlookupd 同样是一个守护进程,负责管理拓扑信息。客户端通过查询 nsqlookupd 来发现指定话题( topic )的生产者,并且 nsqd 节点广播话题(topic)和通道( channel )信息。有两个接口: TCP 接口, nsqd 用它来广播。 HTTP 接口,客户端用它来发现和管理。

下一篇文章,将会继续介绍 nsq 中其他模块实现的细节。

点击关注,第一时间了解华为云新鲜技术~

以上是关于nsq介绍和使用的主要内容,如果未能解决你的问题,请参考以下文章

nsqlookupd:高性能消息中间件 NSQ 解析

高性能消息中间件 nsq 解析-介绍

nsq 启动流程讲解

golang使用Nsq

NSQ

NSQ