Golang使用amqp发送消息
Posted 疯狂奔跑
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Golang使用amqp发送消息相关的知识,希望对你有一定的参考价值。
1.为什么使用信道(channel)而不使用TCP连接发送AMQP命令?
对操作系统来说频繁的建立和销毁TCP连接开销非常昂贵,而操作系统每秒建立的连接是有上限的,性能瓶颈不可避免,而只建立一条TCP连接无疑是一个很好的方案,在这条连接当中建立多条信道与RabbitMQ进行私密通信,相当于光纤电缆一样,一条电缆有多条光束,信道是没有限制的
2.队列
1)AMQP的命令basic.consume与basic.get
如果需要消息一到达队列就自动接收的话,应该使用basic.consume
basic.get会订阅消息,获得单条消息,然后取消订阅,值得注意的是不应该循环basic.get来替代basic.consume,应该理性使用basic.consume实现高吞吐量
消息如果到达无人订阅的队列,消息会在队列中等待,知道有消费者订阅到该队列
2)接收消息
消费者确认消息告诉RabbitMQ已正确接收RabbitMQ会安全的把消息从队列上删除,
如果消费者收到一条消息,确认之前RabbitMQ断开了连接(或者从队列上取消订阅),RabbitMQ会认为这条消息没有被分发,然后分发给下一个订阅者。如果应用程序处理消息耗时,则可以延迟确认消息,防止源源不断的消息涌入
3)拒绝接收
消息未确认之前
a.消费者从RabbitMQ服务器断开连接,它会将消息入队并发送给下一个消费者,但是这种连接/断开方式会增加服务器负担
b.可以使用basic.reject命令拒绝接收消息,参数为true,会发送给下一个消费者,false时,会把消息从队列中移除不会分发给下一个消费者
4)队列设置
如果想拥有私人队列只为一个消费者服务,可以设置exclusive参数为true
如果需要临时队列和结合exclusive和auto_delete,auto_delete在消费者取消订阅时,会自动删除,都设置为true
3.三种基本交换类型
direct交换器非常简单:如果路由键匹配的话,消息就会被投递到相应的队列当中;
fanout交换器会将收到的消息广播到绑定的队列上;
topic交换器使得来自不同源头的消息能够到达同一个队列。
4.虚拟主机(vhost)的作用
a.逻辑分离允许为不同应用程序安全保密的运行数据,将rabbit的众多客户区分开来,避免队列和交换器的命名冲突;
b.权限控制以vhost为单位;
c.vhost之间是绝对隔离的,无法将vhost1上的交换器绑定到vhost2中的队列去;
d.可以安全的迁移到新的RabbitMQ服务器上处理新的负载,不会有任何命名冲突
e.vhost不仅消除了在基础架构中为每一层都运行一个RabbitMQ服务器,也避免了为每一层创建不同集群
5.持久化的策略
durable属性决定了RabbitMQ是否需要在崩溃或者重启之后重新创建队列(或者交换器)
持久化消息三个要点:
把他的投递模式(delivery mode)选项设置为2(持久);
发送到持久化的交换器上;
到达持久化的队列。
特点:
持久性消息从服务器重启中恢复的方式是写入磁盘的持久化日志文件,当发布一条持久性消息到持久化的交换器上时,消息提交到日志文件后才会响应;
持久性消息如果路由到了非持久化的队列当中,会自动从持久性日志中移除,无法从服务器重启中恢复;
一旦被正确消费(经过确认后),RabbitMQ会在持久化日志中将这条消息标记为等待垃圾收集。在消费之前,如果重启,服务器会重建交换器和队列以及绑定,重播持久性日志文件中的消息到合适的队列或者交换器上,这取决与宕机时消息处在哪个环节上
6.解决事务的方案:发送方确认模式
由于AMQP内部事务对性能有很大瓶颈,现采取发送方确认模式保证事务,将信道设置为confirm模式,所有在此信道上发布的消息都会有一个唯一的ID号,当被投递到匹配的队列时,信道就会发送一个发送方确认模式给生产者应用程序,这个模式是异步的,应用程序可以等待确认的同时继续发送下一条,但如果是持久化的消息,会在写入磁盘之后消息发出。
如果发送内部错误而导致消息丢失,RabbitMQ会发送一条nack(not acknowledged,未确认)消息,这种模式下每分钟可追踪数以百万计的消息投递
7. 性能特点
7.1 可靠性(Reliability)
RabbitMQ提供很多特性供我们可以在性能和可靠性作出折中的选择,包括持久化、发送确认、发布者确认和高可用性等。
7.2 弹性选路(Flexible Routing)
消息在到达队列前通过交换(exchanges)来被选路。RabbitMQ为典型的选路逻辑设计了几个内置的交换类型。对于更加复杂的选路,我们可以将exchanges绑定在一起或者写属于自己的exchange类型插件。
7.3 集群化(Clustering)
在一个局域网内的几个RabbitMQ服务器可以集群起来,组成一个逻辑的代理人。
7.4 联盟(Federation)
对于那些需要比集群更加松散和非可靠连接的服务器来说,RabbitMQ提供一个联盟模型(Federation Model)
7.5 高可用队列(High Available Queue)
可以在一个集群里的几个机器里对队列做镜像,确保即时发生了硬件失效,你的消息也是安全的。
7.6 多种协议(Multi-protocol)
RabbitMQ支持消息在多种协议中传输。
7.7 多客户端(Many Clients)
RabbitMQ客户端有你几乎能想象到的任何语言。
7.8 管理界面(Management UI)
RabbitMQ附带一个容易使用的管理界面,允许您监控和控制你的消息broker服务器的方方面面。
7.9 跟踪(Tracing)
如果你的消息系统行为异常,RabbitMQ提供跟踪支持来找出错误的根源。
7.10 插件系统(Plugin System)
RabbitMQ提供各种方式的插件扩展,我们可以实现自己的插件。
使用任务队列一个优点是能够轻易地并行处理任务。当处理大量积压的任务,只要增加工作队列,通过这个方式,能够实现轻易的缩放。
发送代码:
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://root:[email protected]:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.Publish(
"Test", // exchange
"Agent", // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
MessageId: string(1),
Type: "AgentJob",
Body: []byte("Hello world"),
})
log.Println("send ok")
failOnError(err, "Failed to publish a message")
}
接收代码:
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://root:[email protected]:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
ch.Reject()
q, err := ch.QueueDeclare(
"Agent", // name
true, // durable
true, // delete when usused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
err = ch.QueueBind("Agent", "Agent", "Test_01", false, nil)
if err != nil {
log.Println(err)
return
}
forever := make(chan bool)
go func() {
for d := range msgs {
log.Println(d.Type)
log.Println(d.MessageId)
log.Printf("Received a message: %s", d.Body)
}
}()
log.Printf("Waiting for messages. To exit press CTRL+C")
<-forever
}
本文来自:CSDN博客
以上是关于Golang使用amqp发送消息的主要内容,如果未能解决你的问题,请参考以下文章
并发之消息队列----基于AMQP实现的golang消息队列MaxQ
AMQP Qpid Proton - 无法将消息发送到超过 256 个队列