Golang操作Rabbitmq
Posted 玩家_名狱
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Golang操作Rabbitmq相关的知识,希望对你有一定的参考价值。
简介
Rabbitmq就是一个消息的中间人,负责接受消息、转发消息和存储消息,并不参与消息的处理和计算。
没有固定哪一方是生产者,哪一方是消费者,谁发送消息给Rabbitmq谁就是生产者,谁接收Rabbitmq发送的消息谁就是消费者。Rabbitmq这个词是描述整个软件,而缓存消息的是这个软件内部的队列(或者说缓冲器,先进先出结构),内部可以后很多条队列,由于不参与计算,所以队列的性能只受内存和磁盘的约束。
一个生产者可以对应一个或者多个队列,一个队列可以对应一个或者多个消费者,并且传输过程中又很多处理方法,因此有很多种玩法,目前为止有七种,simple、work queues、publish / subscribe、routing、topics、rpcpublisher confirms。
使用Golang语言操作Rabbitmq,先安装Golang第三方库
go get github.com/streadway/amqp
然后在当前工程目录下使用命令初始化项目
go mod init "main"
导入依赖。go工具会读取工程代码中使用到的第三方库名字,然后到GOPATH下找到该第三方库,然后拷贝到当前项目下
go mod vendor
描述工作模型时,常用几个符号表示他们
一、simple 简单模型
创建两个go文件,一个send.go是生产者用于发送消息到Rabbitmq,另一个receive.go是消费者用于接收Rabbitmq的消息
生产者send.go
的内容如下:
- 首先连接Rabbitmq服务
- 然后打开通道,之后都是使用通道进行消息的发送和接收
- 发送的消息都是存到队列中的,因此要先创建有队列
- 最后注册生产者,同时往队列中发送消息
// send.go
package main
import (
"log"
"github.com/streadway/amqp"
)
// 定义错误输出
func failOnError(err error, msg string)
if err != nil
log.Fatalf("%s:%s", msg, err)
func main()
// 连接Rabbitmq服务。协议://用户名:密码@主机IP:端口
conn, err := amqp.Dial("amqp://zhong:12345678@localhost:5672")
failOnError(err, "连接失败")
defer conn.Close()
// 打开通道
ch, err := conn.Channel()
failOnError(err, "打开通道失败")
defer ch.Close()
// 创建一个队列,存在则不创建
q, err := ch.QueueDeclare(
"simple", // 定义队列的名字
false, // 队列是否持久化保存到硬盘。队列里面的数据是否保存得取决于发布者发布消息时的设置
false, // 没有消费者使用时是否删除
false, // 是否排他性
false, // 是否无需等待
nil, // 其他参数
)
failOnError(err, "创建队列失败")
// 发布消息到队列。注册生产者
message := "Hello world !!! My name is Mingyu."
err = ch.Publish(
"", // 交换的名字,空表示默认
q.Name, // 路由键,也是队列的标识
false, // 返回值,必填
false, // 立即
amqp.Publishing
ContentType: "text/plain", // 消息的格式
Body: []byte(message), // 发送的消息是字节数组类型
)
failOnError(err, "发布消息失败")
运行之后如下图所示,一运行就结束了,我们不知道它是否执行成功与否
接着到图形界面看看,发现有一个队列了,队列的名字就是我们创建的simple
消费者receive.go
的内容如下:
- 连接、打开通道和创建队列,都和生产者一致
- 然后创建消费者,取出队列中的消息
- 然后异步读取取得的消息,如果是同步的方式读取,会导致执行到读取函数了,而数据还在网络中传输
- 阻塞主进程,只要message的数据没有读取到,主进程就不会退出。这样的话不管是不是异步读取,都能确保读取到数据
package main
import (
"log"
"github.com/streadway/amqp"
)
// 定义错误输出
func failOnErrorConsume(err error, msg string)
if err != nil
log.Fatalf("%s:%s", msg, err)
func main()
// 连接Rabbitmq服务。协议://用户名:密码@主机IP:端口
conn, err := amqp.Dial("amqp://zhong:12345678@localhost:5672")
failOnErrorConsume(err, "连接失败")
defer conn.Close()
// 打开通道
ch, err := conn.Channel()
failOnErrorConsume(err, "打开通道失败")
defer ch.Close()
// 创建一个队列,存在则不创建
// 注意:如果生产者已创建队列,那么没问题;如果消费者先启动,那么该队列就不存在,就会报错退出
// 因此即使在消费者中也创建队列,大不了存在了就不用创建
q, err := ch.QueueDeclare(
"simple", // 定义队列的名字
false, // 队列是否持久化保存到硬盘。队列里面的数据是否保存得取决于发布者发布消息时的设置
false, // 没有消费者使用时是否删除
false, // 是否排他性
false, // 是否无需等待
nil, // 其他参数
)
failOnErrorConsume(err, "创建队列失败")
// 从队列获取消息。注册消费者
message, err := ch.Consume(
q.Name, // 队列名
"", // 消费者
true, // 自动确认
false, // 排他性
false, // 非本地
false, // 无需等待
nil, // 参数
)
failOnErrorConsume(err, "注册消费者失败")
// 异步读取消息
go func()
for data := range message
log.Fatalf("返回的消息为:%s", data.Body)
()
// 创建通道,并监听。用于阻塞主进程,使得主进程不退出
forever := make(chan bool)
log.Printf("[*] 开始监听消息")
<-forever
运行之后结果如下,能读取到队列的消息
再运行一次,发现读取不到。
因为我们已经消费了该数据,那么队列就会删除该数据。
消费者读取数据之后,就会给队列发送确认信息,队列收到确认消息之后就删除该数据,只要不收到确认消息,队列就一直保存该信息。
因此,如果我们需要该数据被消费一次之后就要删除,就可以使用自动确认的方式;如果我们需要该数据被消费多次才删除,我们就取消使用自动确认的方式,也就是设置为false,然后需要确认删除时,调用data.Ack(false)函数进行手动确认
二、work queues 工作队列模型
该模型主要应用于I/O密集任务的分发,同时解决了削峰问题。比如http请求图片,一个还没什么,但是大量的话一个服务器就承受不了,此时我们可以把http请求转化为消息都放到队列中,然后由服务器请求队列中的消息,最后解析并运行该消息的任务。
生产者send_task.go
的内容如下:
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnErrorPublish(err error, msg string)
if err != nil
log.Fatalf("%s:%s", msg, err)
func main()
// 连接Rabbitmq服务。协议://用户名:密码@主机IP:端口
conn, err := amqp.Dial("amqp://zhong:12345678@localhost:5672")
failOnErrorPublish(err, "连接失败")
defer conn.Close()
// 打开通道
ch, err := conn.Channel()
failOnErrorPublish(err, "打开通道失败")
defer ch.Close()
// 创建一个队列,存在则不创建
q, err := ch.QueueDeclare(
"work_queues", // 定义队列的名字
false, // 队列是否持久化保存到硬盘。队列里面的数据是否保存得取决于发布者发布消息时的设置
false, // 没有消费者连接使用时是否删除
false, // 是否排他性。queue的connection断了,那么这个队列就被删除了
false, // 是否无需等待
nil, // 其他参数
)
failOnErrorPublish(err, "创建队列失败")
// 发布消息到队列。注册生产者
request_string := `GET /images/tutu.png HTTP/1.1
Host: baidu.com
Connection: keep-alive
Pragma: no-cache`
err = ch.Publish(
"", // 交换的名字,空表示默认
q.Name, // 路由键,也是队列的标识
false, // 返回值,必填
false, // 立即
amqp.Publishing
DeliveryMode: amqp.Persistent, // 该消息进行持久化。若为Transient代表不持久化
ContentType: "text/plain", // 消息的格式
Body: []byte(request_string), // 发送的消息是字节数组类型
)
failOnErrorPublish(err, "发布消息失败")
先别启动send_task.go,先启动两个下方的receive_work.go,再启动send_task.go,观察。。。
消费者receive_work.go
的内容如下:
package main
import (
"log"
"strings"
"github.com/streadway/amqp"
)
func failOnErrorConsume(err error, msg string)
if err != nil
log.Fatalf("%s:%s", msg, err)
func main()
// 连接Rabbitmq服务。协议://用户名:密码@主机IP:端口
conn, err := amqp.Dial("amqp://zhong:12345678@localhost:5672")
failOnErrorConsume(err, "连接失败")
defer conn.Close()
// 打开通道
ch, err := conn.Channel()
failOnErrorConsume(err, "打开通道失败")
defer ch.Close()
// 创建一个队列,存在则不创建
// 注意:如果生产者已创建队列,那么没问题;如果消费者先启动,那么该队列就不存在,就会报错
// 因此即使在消费者中也创建队列,大不了存在就不用创建
q, err := ch.QueueDeclare(
"work_queues", // 定义队列的名字
false, // 队列是否持久化保存到硬盘。队列里面的数据是否保存得取决于发布者发布消息时的设置
false, // 没有消费者连接使用时是否删除
false, // 是否排他性。queue的connection断了,那么这个队列就被删除了
false, // 是否无需等待
nil, // 其他参数
)
failOnErrorConsume(err, "创建队列失败")
// 从队列获取消息。注册消费者
message, err := ch.Consume(
q.Name, // 队列名
"", // 消费者
true, // 自动确认
false, // 排他性
false, // 非本地
false, // 无需等待
nil, // 参数
)
failOnErrorConsume(err, "注册消费者失败")
// 异步读取消息
go func()
for data := range message
one_lines := strings.Split(string(data.Body), "\\n")[0] // 截取第一行
request_path := strings.Split(one_lines, " ")[1] // 截取第一行中间的字符串
log.Fatalf("客户端请求的资源为:%s", request_path)
()
// 创建通道,并监听。用于阻塞主进程,使得主进程不退出
forever := make(chan bool)
log.Printf("[*] 开始监听消息")
<-forever
三、publish / subscribe 发布订阅模型
生产者可以自定义发送消息到那一台交换机上,并由交换机根据匹配规则发送消息到哪些队列中,然后消费者再消费队列中的消息。这里的交换机并不是网路中的交换机设备,而是Rabbitmq内部实现的交换器。在之前的两种模型中,使用的都是默认交换机,而这里我们使用自定义的交换机。这种模型和工作队列模型最大的区别是,这个模型可以发布消息到指定的队列中。
交换机把消息发送的一个队列,或发送到多个队列,或丢弃该消息,都是根据交换类型(匹配规则)来执行,交换类型由四种:direct、topic、headers、fanout,下面使用fanout类型工作,之后逐一介绍其它类型。fanout类型会把所有消息广播到交换机知道的所有队列,队列是通过绑定的方式绑定到某个交换机的。
下面实现一个日志记录系统,用于记录用户的访问行为。用户不可能一直在线,因此我们希望消息发送到队列之后,消费者消费了消息之后,就断开消费者和队列连接,然后该队列被删除。如果你对前面的队列创建很熟悉,完成这个功能很简单,但需要提的是我们创建队列时,可以指定名称,如果为空即不指定,那么系统就会随机生成一个名字。
为了完成这个功能,我们在创建队列时会指定第三个参数为true,也就是如果消费者没有连接到该队列,该队列就会被删除,因此我们需要在消费者中创建队列,而不是生产者中创建队列,生产者代码中可以省略创建队列,并绑定队列到交换机的代码,这些代码都在消费者代码中完成,并且首先运行的是消费者代码,然后再运行生产者代码。
生产者publish_log.go
内容如下:
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnErrorPublish(err error, msg string)
if err != nil
log.Fatalf("%s:%s", msg, err)
func main()
// 连接Rabbitmq服务。协议://用户名:密码@主机IP:端口
conn, err := amqp.Dial("amqp://zhong:12345678@localhost:5672")
failOnErrorPublish(err, "连接失败")
defer conn.Close()
// 打开通道
ch, err := conn.Channel()
failOnErrorPublish(err, "打开通道失败")
defer ch.Close()
// 发布消息到队列。注册生产者
request_string := "屌丝访问了女明星出轨页面"
err = ch.Publish(
"logs", // 交换的名字,空表示默认
"", // 路由键,也是队列的标识
false, // 返回值,必填
false, // 立即
amqp.Publishing
ContentType: "text/plain", // 消息的格式
Body: []byte(request_string), // 发送的消息是字节数组类型
)
failOnErrorPublish(err, "发布消息失败")
消费者subscribe_log.go
内容如下:
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnErrorConsume(err error, msg string)
if err != nil
log.Fatalf("%s:%s", msg, err)
func main()
// 连接Rabbitmq服务。协议://用户名:密码@主机IP:端口
conn, err := amqp.Dial("amqp://zhong:12345678@localhost:5672")
failOnErrorConsume(err, "连接失败")
defer conn.Close()
// 打开通道
ch, err := conn.Channel()
failOnErrorConsume(err, "打开通道失败")
defer ch.Close()
// 创建交换机
err = ch.ExchangeDeclare(
"logs", // 交换机的名字
"fanout", // 交换类型
true, //该交换机是否持久化
false, // 自动删除
false, // 内部
false, // 无需等待
nil, // 参数
)
// 创建一个队列,存在则不创建
q, err := ch.QueueDeclare(
"", // 定义队列的名字
false, // 队列是否持久化保存到硬盘。队列里面的数据是否保存得取决于发布者发布消息时的设置
false, // 没有消费者连接使用时是否删除
true, // 是否排他性。queue的connection断了,那么这个队列就被删除了
false, // 是否无需等待
nil, // 其他参数
)
failOnErrorConsume(err, "创建队列失败")
// 绑定队列到交换机
err = ch.QueueBind(
q.Name, // 队列的名字
"", // 路由键
"logs", // 交换机
false, // 无需等待
nil,
)
// 从队列获取消息。注册消费者
message, err := ch.Consume(
q.Name, // 队列名
"", // 消费者
true, // 自动确认
false, // 排他性
false, // 非本地
false, // 无需等待
nil, // 参数
)
failOnErrorConsume(err, "注册消费者失败")
// 异步读取消息
go func()
for data := range message
log.Fatalf("日志记录:%s", data.Body)
()
// 创建通道,并监听。用于阻塞主进程,使得主进程不退出
forever := make(chan bool)
log.Printf("[*] 开始监听消息")
<-forever
先启动消费者代码,启动两个,代表有两个随机生成的队列,它们都开始监听,然后运行生产者代码,就可以发现两个消费者都收到了消息
四、routing 路由模型
在发布订阅模型中,生产者发布的消息使用了fanout类型很笨的发布到了绑定某交换机的所有队列,可我们希望错误类型的日志发布到一个队列,调试类型的日志发布到另一个队列,因此我们要有选择的把消息通过交换机发布到指定的队列。这里使用的交换类型是direct类型,而不是fanout类型
这个过程使用到了路由键。在前面的代码中,生产者发布消息时,第二个参数需要指定路由键,消费者的队列绑定交换机时,第二个参数需要指定路由键。这个路由键也就是名字,自己任意定义的,如果这两个名字相同,就说明生产者发布的消息就发布到这个队列中。如果指定发送的路由键不存在,那么该消息就会被丢弃。
生产者publish_errlog.go
的内容如下:
这里的代码中,我把发布消息函数抽出来了,
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnErrorPublish(err error, msg string)
if err != nil
log.Fatalf("%s:%s", msg, err)
func myPublish(ch *amqp.Channel, logtype string, message string)
err := ch.Publish(
"logs_direct", // 交换的名字,空表示默认
logtype, // 路由键,也是队列的标识
false, // 返回值,必填
false, // 立即
amqp.Publishing
ContentType: "text/plain", // 消息的格式
Body: []byte(message), // 发送的消息是字节数组类型
)
failOnErrorPublish(err, "发布消息失败")
func main()
// 连接Rabbitmq服务。协议://用户名:密码@主机IP:端口
conn, err := amqp.Dial("amqp://zhong:12345678@localhost:5672")
failOnErrorPublish(err, "连接失败")
defer conn.Close()
// 打开通道
ch, err := conn.Channel()
failOnErrorPublish(err, "打开通道失败")
defer ch.Close()
// 创建交换机
err = ch.ExchangeDeclare(
"logs_direct",
"direct",
true,
false,
false,
false,
nil,
)
// 发布消息到队列。注册生产者
logerrtype1 := "[debug]屌丝访问了女明星出轨页面"
logerrtype2 := "[error]数据库连接异常"
myPublish(ch, "debug", logerrtype1)
myPublish(ch, "error", logerrtype2)
消费者subscribe_errlog.go
内容如下:
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnErrorConsume(err error, msg string)
if err != nil
log.Fatalf("%s:%s", msg, err)
func main()
// 连接Rabbitmq服务。协议://用户名:密码@主机IP:端口
conn, err := amqp.golang 操作 Redis & Mysql & RabbitMQ
Go语言学习笔记golang 操作 Redis & Mysql & RabbitMQ