RabbitMQ(Golang版本)

Posted 会跳舞的哈密瓜

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ(Golang版本)相关的知识,希望对你有一定的参考价值。

一、RabbitMQ 简介

消息队列是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是/用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。

二、使用场景

1.解耦

场景说明:用户下单后,订单系统需要通知库存系统。传统的做法是,订单系统调用库存系统的接口。

在这里插入图片描述

传统模式的缺点:

  • 如果库存系统出现故障,订单系统当收到订单时也会出现问题
  • 订单与库存系统之间高耦合

采用消息队列后

在这里插入图片描述

  • 订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功
  • 库存系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作
  • 假如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦
  • 为了保证库存肯定有,可以将队列大小设置成库存数量,或者采用其他方式解决。

2.异步

场景说明:用户注册后,需要发注册邮件和注册短信。传统的做法有两种 1.串行的方式;2.并行方式

(1)串行方式:将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端

在这里插入图片描述

(2)并行方式:将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间

在这里插入图片描述

(3)引入消息队列,将不是必须的业务逻辑,异步处理。

在这里插入图片描述

3.流量削峰

一般应用于秒杀活动中,因为秒杀活动一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。

作用: 1.可以控制活动人数,超过此一定阀值的订单直接丢弃 2.可以缓解短时间的高流量压垮应用

4.缺点

  • 系统的可用性降低

    例如系统只需要调用abc三个接口,现在引入了mq之后虽然之前调用都没有出错,系统可以正常运行,但是mq出现问题的话,整个系统也同样会受影响

  • 系统的复杂性提高

    需要考虑消息是否丢失,还需要考虑消息传递的顺序

  • 一致性问题

    系统发送完消息返回成功,但是abc中若有系统写入失败,就会产生数据不一致的问题

三、docker安装rabbitmq

1.先将rabbitmq和erlang的软件包上传至阿里云服务器,然后解压安装

rpm -Uvh esl-erlang_23.0-1_centos_7_amd64.rpm
yum install erlang
yum install -y socat
rpm -Uvh rabbitmq-server-3.8.14-1.el7.noarch.rpm
yum install rabbit-server -y

2.启动rabbitmq-server

systemctl start rabbitmq-server
systemctl status rabbitmq-server # 查看状态,

3.添加超级用户(web管理界面)

rabbitmqctl add_user admin admin        # 添加用户  用户名 密码
 rabbitmqctl set_user_tags admin administrator        # 修改用户权限 用户名 权限
 rabbitmqctl change_password admin admin   # 修改密码

4.运行rabbitmq

docker run -di --name myrabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 8030:15672 -p 8031:5672 -p 8032:25672 -p 8033:61613 -p 8034:1883 rabbitmq:management

四、工作模式

(一).simple模式(即最简单的收发模式)

在这里插入图片描述

1.消息产生消息,将消息放入队列

2.消息的消费者(consumer) 监听 消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除(隐患 消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失,这里可以设置成手动的ack,但如果设置成手动ack,处理完后要及时发送ack消息给队列,否则会造成内存溢出)。

//send.go

package main

import (
  "log"

  "github.com/streadway/amqp"
)

func FailOnError(err error, msg string) {
  if err != nil {
    log.Fatalf("%s: %s", msg, err)
  }
}

func main() {
  //链接rabbitmq server
  conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
  FailOnError(err, "Failed to connect to RabbitMQ")
  defer conn.Close()
  //创建一个通道,用于完成任务
  ch,err := conn.Channel()
  FailOnError(err,"Failed to open a channel")
  defer ch.Close()
  //声明一个队列,用于消息发送
  q, err := ch.QueueDeclare(
    "hello world", // 队列名
    false,   // 持久化
    false,   // 是否自动删除
    false,   // 排他性
    false,   // no-wait
    nil,     // 附属参数
  )
  FailOnError(err, "Failed to declare a queue")
  body := "Hello World!"
  err = ch.Publish(    // 发送消息(生产者)
    "",     // exchange
    q.Name, // routing key
    false,  // mandatory
    false,  // immediate
    amqp.Publishing{
      ContentType: "text/plain",
      Body:        []byte(body),
    })
  FailOnError(err, "Failed to publish a message")
  log.Printf(" [x] Sent %s", body)
}

//receive.go
package main

import (
  "log"

  "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
  if err != nil {
    log.Fatalf("%s: %s", msg, err)
  }
}

func main() {
  //连接rabbitmq server
  conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
  failOnError(err, "Failed to connect to RabbitMQ")
  defer conn.Close()

  //创建一个通道
  ch, err := conn.Channel()
  failOnError(err, "Failed to open a channel")
  defer ch.Close()

  //声明队列
  q, err := ch.QueueDeclare(
    "hello world", // name
    false,   // durable
    false,   // delete when unused
    false,   // exclusive
    false,   // no-wait
    nil,     // arguments
  )
  failOnError(err, "Failed to declare a queue")

  msgs, err := ch.Consume(  // 注册一个消费者(接收消息)
    q.Name, // queue
    "",     // consumer
    true,   // auto-ack
    false,  // exclusive
    false,  // no-local
    false,  // no-wait
    nil,    // args
  )
  failOnError(err, "Failed to register a consumer")

  forever := make(chan bool)

  go func() {
    for d := range msgs {
      log.Printf("Received a message: %s", d.Body)
    }
  }()

  log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
  <-forever
}

(二).work工作模式(资源的竞争)

在这里插入图片描述

1.消息产生者将消息放入队列消费者可以有多个,消费者1,消费者2同时监听同一个队列,消息被消费。C1 C2共同争抢当前的消息队列内容,谁先拿到谁负责消费消息(隐患:高并发情况下,默认会产生某一个消息被多个消费者共同使用,可以设置一个开关(syncronize) 保证一条消息只能被一个消费者使用)。

2.消息确认

完成一个任务需要消耗时间,如果一个消费者开始了一个任务并在完成期间关闭或者死亡,我们就会丢失他刚刚处理的消息,同时还将丢失所有已经派发给特定worker但未处理的信息。我们可以使用消息确认来避免这样的事情发生,消费者发回一个 ack(nowledgement) 来告诉 RabbitMQ 一个特定的消息已经被接收、处理并且 RabbitMQ 可以自由地删除它。如果消费者在没有发送 ack 的情况下死亡(其通道关闭、连接关闭或 TCP 连接丢失),RabbitMQ 将理解消息未完全处理并将重新排队。如果有其他消费者同时在线,它会迅速将其重新交付给另一个消费者。这样您就可以确保不会丢失任何消息,即使工作人员偶尔会死亡。

3.消息持久性

RabbitMQ服务器停止,如果没有持久化,任务就会丢失,我们只需要在声明队列时将durable参数设置为true,同时将amqp.Publishing中加入DeliveryMode: amqp.Persistent,即可实现持久化。这样即使我们rabbit重启,我们的队列也不会丢失

//new_task.go

package main

import (
  "log"
  "os"
  "strings"

  "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
  if err != nil {
    log.Fatalf("%s: %s", msg, err)
  }
}

func main() {

  conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
  failOnError(err, "Failed to connect to RabbitMQ")
  defer conn.Close()

  ch, err := conn.Channel()
  failOnError(err, "Failed to open a channel")
  defer ch.Close()

  q, err := ch.QueueDeclare(
    "work queues", // name
    true,          // durable
    false,         // delete when unused
    false,         // exclusive
    false,         // no-wait
    nil,           // arguments
  )
  failOnError(err, "Failed to declare a queue")

  body := bodyFrom(os.Args)

  err = ch.Publish(
    "",     // exchange  这里使用默认交换机,消息是通过交换机传给队列
    q.Name, // routing key
    false,  // mandatory
    false,
    amqp.Publishing{
      DeliveryMode: amqp.Persistent,
      ContentType:  "text/plain",
      Body:         []byte(body),
    })
  failOnError(err, "Failed to publish a message")
  log.Printf(" [x] Sent %s", body)

}

//获取运行时的参数
//例如:go run main.go 1 3 -X ?    输出如下:
//参数0: /tmp/go-build116558042/command-line-arguments/_obj/exe/main
//参数1: 1
//参数2: 3
//参数3: -X
//参数4: ?
func bodyFrom(args []string) string {
  var s string
  if (len(args) < 2) || os.Args[1] == "" { //args长度小于2 或者参数为空的时候
    s = "hello"
  } else {
    s = strings.Join(args[1:], " ") //Join 连接其第一个参数的元素以创建单个字符串。分隔符字符串 sep 放置在结果字符串中的元素之间
  }
  return s
}

//work.go
package main

import (
  "bytes"
  "log"
  "time"

  "github.com/streadway/amqp"
)

func FailOnError(err error, msg string) {
  if err != nil {
    log.Fatalf("%s: %s", msg, err)
  }
}

func main() {

  conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
  FailOnError(err, "Failed to connect to RabbitMQ")
  defer conn.Close()

  ch, err := conn.Channel()
  FailOnError(err, "Failed to open a channel")
  defer ch.Close()

  q, err := ch.QueueDeclare(
    "work queues", // name
    true,         // durable
    false,        // delete when unused
    false,        // exclusive
    false,        // no-wait
    nil,          // arguments
  )
  FailOnError(err, "Failed to declare a queue")

  err = ch.Qos(
    1,     // prefetch count 服务器将在收到确认之前将那么多消息传递给消费者。
    0,     // prefetch size  服务器将尝试在收到消费者的确认之前至少将那么多字节的交付保持刷新到网络
    false, // 当 global 为 true 时,这些 Qos 设置适用于同一连接上所有通道上的所有现有和未来消费者。当为 false 时,Channel.Qos 设置将应用于此频道上的所有现有和未来消费者
  )
  FailOnError(err, "Failed to set QoS")

  msgs, err := ch.Consume(
    q.Name, // queue
    "",     // consumer
    false,  // auto-ack
    false,  // exclusive
    false,  // no-local
    false,  // no-wait
    nil,    // args
  )
  FailOnError(err, "Failed to register a consumer")

  forever := make(chan bool)

  go func() {
    for d := range msgs {
      log.Printf("Received a message: %s", d.Body)
      // func Count(s, sep [] byte ) int  计算s中sep的非重叠实例数。如果sep是空切片,则Count返回1+s中UTF-8编码的代码点数
      dotCount := bytes.Count(d.Body, []byte("."))  // 返回 . 的个数
      t := time.Duration(dotCount)  // 表示为 int64 纳秒计数
      time.Sleep(t * time.Second)  //将当前 goroutine 暂停至少持续时间d
      log.Printf("Done")
      d.Ack(false)     // 必须在成功处理此交付后调用 Delivery.Ack,如果autoAck为true则不需要. 参数 true表示回复当前信道所有未回复的ack,用于批量确认。false表示回复当前条目
    }
  }()

  log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
  <-forever

}

启动两个终端用于执行work.go,一个终端用来执行new_task,相当于newtask是生产者,work是两个消费者,两个消费者共同消费消息,(轮询访问,每个消费者消费的次数相同,公平分发,处理速度快的消费者消费的多,能者多劳)

(三).publish/subscribe发布订阅(共享资源)

在这里插入图片描述

1、每个消费者监听自己的队列;

2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息。

//emit_log.go
package main

import (
  "log"
  "os"
  "strings"

  "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
  if err != nil {
    log.Fatalf("%s: %s", msg, err)
  }
}

func main() {
  conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
  failOnError(err, "Failed to connect to RabbitMQ")
  defer conn.Close()

  ch, err := conn.Channel()
  failOnError(err, "Failed to open a channel")
  defer ch.Close()

  err = ch.ExchangeDeclare(
    "logs",   // name  交换机名字
    "fanout", // type  交换机模式 广播到每个队列
    true,     // durable  持久化
    false,    // auto-deleted
    false,    // internal
    false,    // no-wait
    nil,      // arguments
  )
  failOnError(err, "Failed to declare an exchange")

  body := bodyFrom(os.Args)
  err = ch.Publish(
    "logs", // exchange  生产者将消息发往logs交换机
    "",     // routing key
    false,  // mandatory
    false,  // immediate
    amqp.Publishing{
      ContentType: "text/plain",
      Body:        []byte(body),
    })
  failOnError(err, "Failed to publish a message")

  log.Printf(" [x] Sent %s", body)
}

func bodyFrom(args []string) string {
  var s string
  if (len(args) < 2) || os.Args[1] == "" {
    s = "hello"
  } else {
    s = strings.Join(args[1:], " ")
  }
  return s
}
//receive_log.go
package main

import (
  "log"

  "github.com/streadway/amqp"
)

func FailOnError(err error, msg string) {
  if err != nil {
    log.Fatalf("%s: %s", msg, err)
  }
}

func main() {
  conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
  FailOnError(err, "Failed to connect to RabbitMQ")
  defer conn.Close()

  ch, err := conn.Channel()
  FailOnError(err, "Failed to open a channel")
  defer ch.Close()

  err = ch.ExchangeDeclare(
    "logs",   // name
    "fanout", // type
    true,     // durable
    false,    // auto-deleted
    false,    // internal
    false,    // no-wait
    nil,      // arguments
  )
  FailOnError(err, "Failed to declare an exchange")

  q, err := ch.QueueDeclare(
    "",    // name
    false, // durable
    false, // delete when unused
    true,  // exclusive
    false, // no-wait
    nil,   // arguments
  )
  FailOnError(err, "Failed to declare a queue")

  err = ch.QueueBind(  //声明的队列要和交换机绑定
    q.Name, // queue name
    "",     // routing key
    "logs", // exchange
    false,
    nil)
  FailOnError(err, "Failed to bind a queue")

  msgs, err := ch.Consume(
    q.Name, // queue
    "",     // consumer
    true,   // auto-ack
    false,  // exclusive
    false,  // no-local
    false,  // no-wait
    nil,    // args
  )
  FailOnError(err, "Failed to register a consumer")

  forever := make(chan bool)

  go func() {
    for d := range msgs {
      log.Printf(" [x] %s", d.Body)
    }
  }()

  log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
  <-forever
}

//测试
//启动两个消费者,打开两个终端
go run receive_log.go 
//启动一个生产者
 go run emit_log.go // 看到两个消费者的终端输出hello
 go run emit_log.go 1  // 两个消费者的终端输出1

(四).routing路由模式

在这里插入图片描述

1.消息生产者将消息发送给交换机按照路由判断,路由是字符串(info) 当前产生的消息携带路由字符(对象的方法),交换机根据路由的key,只能匹配上路由key对应的消息队列,对应的消费者才能消费消息;

2.根据业务功能定义路由字符串

3.从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中。

4.业务场景:error 通知;EXCEPTION;错误通知的功能;传统意义的错误通知;客户通知;利用key路由,可以将程序中的错误封装成消息传入到消息队列中,开发者可以自定义消费者,实时接收错误;

//emit_log_direct.go
以上是关于RabbitMQ(Golang版本)的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ(Golang版本)

golang goroutine例子[golang并发代码片段]

golang代码片段(摘抄)

代码片段 - Golang 实现简单的 Web 服务器

代码片段 - Golang 实现集合操作

golang Golang自动重新连接rabbitmq消费者