Golang操作Rabbitmq

Posted 玩家_名狱

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Golang操作Rabbitmq相关的知识,希望对你有一定的参考价值。

简介

Rabbitmq就是一个消息的中间人,负责接受消息、转发消息和存储消息,并不参与消息的处理和计算。

没有固定哪一方是生产者,哪一方是消费者,谁发送消息给Rabbitmq谁就是生产者,谁接收Rabbitmq发送的消息谁就是消费者。Rabbitmq这个词是描述整个软件,而缓存消息的是这个软件内部的队列(或者说缓冲器,先进先出结构),内部可以后很多条队列,由于不参与计算,所以队列的性能只受内存和磁盘的约束。

一个生产者可以对应一个或者多个队列,一个队列可以对应一个或者多个消费者,并且传输过程中又很多处理方法,因此有很多种玩法,目前为止有七种,simple、work queues、publish / subscribe、routing、topics、rpcpublisher confirms。

使用Golang语言操作Rabbitmq,先安装Golang第三方库

go get github.com/streadway/amqp

然后在当前工程目录下使用命令初始化项目

go mod init "main"

导入依赖。go工具会读取工程代码中使用到的第三方库名字,然后到GOPATH下找到该第三方库,然后拷贝到当前项目下

go mod vendor

描述工作模型时,常用几个符号表示他们

x'x

一、simple 简单模型

xxx

创建两个go文件,一个send.go是生产者用于发送消息到Rabbitmq,另一个receive.go是消费者用于接收Rabbitmq的消息

生产者send.go的内容如下:

  • 首先连接Rabbitmq服务
  • 然后打开通道,之后都是使用通道进行消息的发送和接收
  • 发送的消息都是存到队列中的,因此要先创建有队列
  • 最后注册生产者,同时往队列中发送消息
// send.go
package main

import (
	"log"

	"github.com/streadway/amqp"
)

// 定义错误输出
func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s:%s", msg, err)
	}
}

func main() {
	// 连接Rabbitmq服务。协议://用户名:密码@主机IP:端口
	conn, err := amqp.Dial("amqp://zhong:12345678@localhost:5672")
	failOnError(err, "连接失败")
	defer conn.Close()

	// 打开通道
	ch, err := conn.Channel()
	failOnError(err, "打开通道失败")
	defer ch.Close()

	// 创建一个队列,存在则不创建
	q, err := ch.QueueDeclare(
		"simple", // 定义队列的名字
		false,    // 队列是否持久化保存到硬盘。队列里面的数据是否保存得取决于发布者发布消息时的设置
		false,    // 没有消费者使用时是否删除
		false,    // 是否排他性
		false,    // 是否无需等待
		nil,      // 其他参数
	)
	failOnError(err, "创建队列失败")

	// 发布消息到队列。注册生产者
	message := "Hello world !!! My name is Mingyu."
	err = ch.Publish(
		"",     // 交换的名字,空表示默认
		q.Name, // 路由键,也是队列的标识
		false,  // 返回值,必填
		false,  // 立即
		amqp.Publishing{
			ContentType: "text/plain",    // 消息的格式
			Body:        []byte(message), // 发送的消息是字节数组类型
		})
	failOnError(err, "发布消息失败")
}

运行之后如下图所示,一运行就结束了,我们不知道它是否执行成功与否
xxx

接着到图形界面看看,发现有一个队列了,队列的名字就是我们创建的simple
xxx

消费者receive.go的内容如下:

  • 连接、打开通道和创建队列,都和生产者一致
  • 然后创建消费者,取出队列中的消息
  • 然后异步读取取得的消息,如果是同步的方式读取,会导致执行到读取函数了,而数据还在网络中传输
  • 阻塞主进程,只要message的数据没有读取到,主进程就不会退出。这样的话不管是不是异步读取,都能确保读取到数据
package main

import (
	"log"

	"github.com/streadway/amqp"
)

// 定义错误输出
func failOnErrorConsume(err error, msg string) {
	if err != nil {
		log.Fatalf("%s:%s", msg, err)
	}
}

func main() {
	// 连接Rabbitmq服务。协议://用户名:密码@主机IP:端口
	conn, err := amqp.Dial("amqp://zhong:12345678@localhost:5672")
	failOnErrorConsume(err, "连接失败")
	defer conn.Close()

	// 打开通道
	ch, err := conn.Channel()
	failOnErrorConsume(err, "打开通道失败")
	defer ch.Close()

	// 创建一个队列,存在则不创建
	// 注意:如果生产者已创建队列,那么没问题;如果消费者先启动,那么该队列就不存在,就会报错退出
	// 因此即使在消费者中也创建队列,大不了存在了就不用创建
	q, err := ch.QueueDeclare(
		"simple", // 定义队列的名字
		false,    // 队列是否持久化保存到硬盘。队列里面的数据是否保存得取决于发布者发布消息时的设置
		false,    // 没有消费者使用时是否删除
		false,    // 是否排他性
		false,    // 是否无需等待
		nil,      // 其他参数
	)
	failOnErrorConsume(err, "创建队列失败")

	// 从队列获取消息。注册消费者
	message, err := ch.Consume(
		q.Name, // 队列名
		"",     // 消费者
		true,   // 自动确认
		false,  // 排他性
		false,  // 非本地
		false,  // 无需等待
		nil,    // 参数
	)
	failOnErrorConsume(err, "注册消费者失败")

	// 异步读取消息
	go func() {
		for data := range message {
			log.Fatalf("返回的消息为:%s", data.Body)
		}
	}()

	// 创建通道,并监听。用于阻塞主进程,使得主进程不退出
	forever := make(chan bool)
	log.Printf("[*] 开始监听消息")
	<-forever
}

运行之后结果如下,能读取到队列的消息
xx

再运行一次,发现读取不到。

因为我们已经消费了该数据,那么队列就会删除该数据。

消费者读取数据之后,就会给队列发送确认信息,队列收到确认消息之后就删除该数据,只要不收到确认消息,队列就一直保存该信息。

因此,如果我们需要该数据被消费一次之后就要删除,就可以使用自动确认的方式;如果我们需要该数据被消费多次才删除,我们就取消使用自动确认的方式,也就是设置为false,然后需要确认删除时,调用data.Ack(false)函数进行手动确认

xxx

二、work queues 工作队列模型

xxx

该模型主要应用于I/O密集任务的分发,同时解决了削峰问题。比如http请求图片,一个还没什么,但是大量的话一个服务器就承受不了,此时我们可以把http请求转化为消息都放到队列中,然后由服务器请求队列中的消息,最后解析并运行该消息的任务。

生产者send_task.go的内容如下:

package main

import (
	"log"

	"github.com/streadway/amqp"
)

func failOnErrorPublish(err error, msg string) {
	if err != nil {
		log.Fatalf("%s:%s", msg, err)
	}
}

func main() {
	// 连接Rabbitmq服务。协议://用户名:密码@主机IP:端口
	conn, err := amqp.Dial("amqp://zhong:12345678@localhost:5672")
	failOnErrorPublish(err, "连接失败")
	defer conn.Close()

	// 打开通道
	ch, err := conn.Channel()
	failOnErrorPublish(err, "打开通道失败")
	defer ch.Close()

	// 创建一个队列,存在则不创建
	q, err := ch.QueueDeclare(
		"work_queues", // 定义队列的名字
		false,         // 队列是否持久化保存到硬盘。队列里面的数据是否保存得取决于发布者发布消息时的设置
		false,         // 没有消费者连接使用时是否删除
		false,         // 是否排他性。queue的connection断了,那么这个队列就被删除了
		false,         // 是否无需等待
		nil,           // 其他参数
	)
	failOnErrorPublish(err, "创建队列失败")

	// 发布消息到队列。注册生产者
	request_string := `GET /images/tutu.png HTTP/1.1
Host: baidu.com
Connection: keep-alive
Pragma: no-cache`
	err = ch.Publish(
		"",     // 交换的名字,空表示默认
		q.Name, // 路由键,也是队列的标识
		false,  // 返回值,必填
		false,  // 立即
		amqp.Publishing{
			DeliveryMode: amqp.Persistent,        // 该消息进行持久化。若为Transient代表不持久化
			ContentType:  "text/plain",           // 消息的格式
			Body:         []byte(request_string), // 发送的消息是字节数组类型
		})
	failOnErrorPublish(err, "发布消息失败")
}

先别启动send_task.go,先启动两个下方的receive_work.go,再启动send_task.go,观察。。。

消费者receive_work.go的内容如下:

package main

import (
	"log"
	"strings"

	"github.com/streadway/amqp"
)

func failOnErrorConsume(err error, msg string) {
	if err != nil {
		log.Fatalf("%s:%s", msg, err)
	}
}

func main() {
	// 连接Rabbitmq服务。协议://用户名:密码@主机IP:端口
	conn, err := amqp.Dial("amqp://zhong:12345678@localhost:5672")
	failOnErrorConsume(err, "连接失败")
	defer conn.Close()

	// 打开通道
	ch, err := conn.Channel()
	failOnErrorConsume(err, "打开通道失败")
	defer ch.Close()

	// 创建一个队列,存在则不创建
	// 注意:如果生产者已创建队列,那么没问题;如果消费者先启动,那么该队列就不存在,就会报错
	// 因此即使在消费者中也创建队列,大不了存在就不用创建
	q, err := ch.QueueDeclare(
		"work_queues", // 定义队列的名字
		false,         // 队列是否持久化保存到硬盘。队列里面的数据是否保存得取决于发布者发布消息时的设置
		false,         // 没有消费者连接使用时是否删除
		false,         // 是否排他性。queue的connection断了,那么这个队列就被删除了
		false,         // 是否无需等待
		nil,           // 其他参数
	)
	failOnErrorConsume(err, "创建队列失败")

	// 从队列获取消息。注册消费者
	message, err := ch.Consume(
		q.Name, // 队列名
		"",     // 消费者
		true,   // 自动确认
		false,  // 排他性
		false,  // 非本地
		false,  // 无需等待
		nil,    // 参数
	)
	failOnErrorConsume(err, "注册消费者失败")

	// 异步读取消息
	go func() {
		for data := range message {
			one_lines := strings.Split(string(data.Body), "\\n")[0]  // 截取第一行
			request_path := strings.Split(one_lines, " ")[1]  // 截取第一行中间的字符串
			log.Fatalf("客户端请求的资源为:%s", request_path)
		}
	}()

	// 创建通道,并监听。用于阻塞主进程,使得主进程不退出
	forever := make(chan bool)
	log.Printf("[*] 开始监听消息")
	<-forever
}

xxx

三、publish / subscribe 发布订阅模型

xxx
生产者可以自定义发送消息到那一台交换机上,并由交换机根据匹配规则发送消息到哪些队列中,然后消费者再消费队列中的消息。这里的交换机并不是网路中的交换机设备,而是Rabbitmq内部实现的交换器。在之前的两种模型中,使用的都是默认交换机,而这里我们使用自定义的交换机。这种模型和工作队列模型最大的区别是,这个模型可以发布消息到指定的队列中。

交换机把消息发送的一个队列,或发送到多个队列,或丢弃该消息,都是根据交换类型(匹配规则)来执行,交换类型由四种:directtopicheadersfanout,下面使用fanout类型工作,之后逐一介绍其它类型。fanout类型会把所有消息广播到交换机知道的所有队列,队列是通过绑定的方式绑定到某个交换机的。

下面实现一个日志记录系统,用于记录用户的访问行为。用户不可能一直在线,因此我们希望消息发送到队列之后,消费者消费了消息之后,就断开消费者和队列连接,然后该队列被删除。如果你对前面的队列创建很熟悉,完成这个功能很简单,但需要提的是我们创建队列时,可以指定名称,如果为空即不指定,那么系统就会随机生成一个名字。

为了完成这个功能,我们在创建队列时会指定第三个参数为true,也就是如果消费者没有连接到该队列,该队列就会被删除,因此我们需要在消费者中创建队列,而不是生产者中创建队列,生产者代码中可以省略创建队列,并绑定队列到交换机的代码,这些代码都在消费者代码中完成,并且首先运行的是消费者代码,然后再运行生产者代码。

生产者publish_log.go内容如下:

package main

import (
	"log"

	"github.com/streadway/amqp"
)

func failOnErrorPublish(err error, msg string) {
	if err != nil {
		log.Fatalf("%s:%s", msg, err)
	}
}

func main() {
	// 连接Rabbitmq服务。协议://用户名:密码@主机IP:端口
	conn, err := amqp.Dial("amqp://zhong:12345678@localhost:5672")
	failOnErrorPublish(err, "连接失败")
	defer conn.Close()

	// 打开通道
	ch, err := conn.Channel()
	failOnErrorPublish(err, "打开通道失败")
	defer ch.Close()

	// 发布消息到队列。注册生产者
	request_string := "屌丝访问了女明星出轨页面"
    err = ch.Publish(
        "logs", // 交换的名字,空表示默认
        "",     // 路由键,也是队列的标识
        false,  // 返回值,必填
        false,  // 立即
        amqp.Publishing{
            ContentType: "text/plain",           // 消息的格式
            Body:        []byte(request_string), // 发送的消息是字节数组类型
        })
    failOnErrorPublish(err, "发布消息失败")
}

消费者subscribe_log.go内容如下:

package main

import (
	"log"

	"github.com/streadway/amqp"
)

func failOnErrorConsume(err error, msg string) {
	if err != nil {
		log.Fatalf("%s:%s", msg, err)
	}
}

func main() {
	// 连接Rabbitmq服务。协议://用户名:密码@主机IP:端口
	conn, err := amqp.Dial("amqp://zhong:12345678@localhost:5672")
	failOnErrorConsume(err, "连接失败")
	defer conn.Close()

	// 打开通道
	ch, err := conn.Channel()
	failOnErrorConsume(err, "打开通道失败")
	defer ch.Close()

	// 创建交换机
	err = ch.ExchangeDeclare(
		"logs",   // 交换机的名字
		"fanout", // 交换类型
		true,     //该交换机是否持久化
		false,    // 自动删除
		false,    // 内部
		false,    // 无需等待
		nil,      // 参数
	)

	// 创建一个队列,存在则不创建
	q, err := ch.QueueDeclare(
		"",    // 定义队列的名字
		false, // 队列是否持久化保存到硬盘。队列里面的数据是否保存得取决于发布者发布消息时的设置
		false, // 没有消费者连接使用时是否删除
		true,  // 是否排他性。queue的connection断了,那么这个队列就被删除了
		false, // 是否无需等待
		nil,   // 其他参数
	)
	failOnErrorConsume(err, "创建队列失败")

	// 绑定队列到交换机
	err = ch.QueueBind(
		q.Name, // 队列的名字
		"",     // 路由键
		"logs", // 交换机
		false,  // 无需等待
		nil,
	)

	// 从队列获取消息。注册消费者
	message, err := ch.Consume(
		q.Name, // 队列名
		"",     // 消费者
		true,   // 自动确认
		false,  // 排他性
		false,  // 非本地
		false,  // 无需等待
		nil,    // 参数
	)
	failOnErrorConsume(err, "注册消费者失败")

	// 异步读取消息
	go func() {
		for data := range message {
			log.Fatalf("日志记录:%s", data.Body)
		}
	}()

	// 创建通道,并监听。用于阻塞主进程,使得主进程不退出
	forever := make(chan bool)
	log.Printf("[*] 开始监听消息")
	<-forever

}

先启动消费者代码,启动两个,代表有两个随机生成的队列,它们都开始监听,然后运行生产者代码,就可以发现两个消费者都收到了消息

xxx

四、routing 路由模型

xxx
在发布订阅模型中,生产者发布的消息使用了fanout类型很笨的发布到了绑定某交换机的所有队列,可我们希望错误类型的日志发布到一个队列,调试类型的日志发布到另一个队列,因此我们要有选择的把消息通过交换机发布到指定的队列。这里使用的交换类型是direct类型,而不是fanout类型

这个过程使用到了路由键。在前面的代码中,生产者发布消息时,第二个参数需要指定路由键,消费者的队列绑定交换机时,第二个参数需要指定路由键。这个路由键也就是名字,自己任意定义的,如果这两个名字相同,就说明生产者发布的消息就发布到这个队列中。如果指定发送的路由键不存在,那么该消息就会被丢弃。

生产者publish_errlog.go的内容如下:

这里的代码中,我把发布消息函数抽出来了,

package main

import (
	"log"

	"github.com/streadway/amqp"
)

func failOnErrorPublish(err error, msg string) {
	if err != nil {
		log.Fatalf("%s:%s", msg, err)
	}
}

func myPublish(ch *amqp.Channel, logtype string, message string) {
	err := ch.Publish(
		"logs_direct", // 交换的名字,空表示默认
		logtype,       // 路由键,也是队列的标识
		false,         // 返回值,必填
		false,         // 立即
		amqp.Publishing{
			ContentType: "text/plain",    // 消息的格式
			Body:        []byte(message), // 发送的消息是字节数组类型
		})
	failOnErrorPublish(err, "发布消息失败")

}

func main() {
	// 连接Rabbitmq服务。协议://用户名:密码@主机IP:端口
	conn, err := amqp.Dial("amqp://zhong:12345678@localhost:5672")
	failOnErrorPublish(err, "连接失败")
	defer conn.Close()

	// 打开通道
	ch, err := conn.Channel()
	failOnErrorPublish(err, "打开通道失败")
	defer ch.Close()

	// 创建交换机
	err = ch.ExchangeDeclare(
		"logs_direct",
		"direct",
		true,
		false,
		false,
		false,
		nil,
	)

	// 发布消息到队列。注册生产者
	logerrtype1 := "[debug]屌丝访问了女明星出轨页面"
	logerrtype2 := "[error]数据库连接异常"
	myPublish(ch, "debug", logerrtype1)
	myPublish(ch, "error", logerrtype2)
}

消费者subscribe_errlog.go内容如下:

package main

import (
	"log"

	"github.com/streadway/amqp"
)

func failOnErrorConsume(err error, msg string) {
	if err != nil {
		log.Fatalf("%s:%s", msg, err)
	}
}

func main() {
	// 连接Rabbitmq服务。协议://用户名:

以上是关于Golang操作Rabbitmq的主要内容,如果未能解决你的问题,请参考以下文章

Golang操作Rabbitmq

golang 操作 Redis & Mysql & RabbitMQ

golang goroutine例子[golang并发代码片段]

Go语言学习笔记golang 操作 Redis & Mysql & RabbitMQ

golang代码片段(摘抄)

代码片段 - Golang 实现简单的 Web 服务器