golang RabbitMQ重新连接功能的示例。包括恢复已注册的消费者。 Posted 2021-05-24
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了golang RabbitMQ重新连接功能的示例。包括恢复已注册的消费者。相关的知识,希望对你有一定的参考价值。
package main
import (
"fmt"
"log"
"time"
)
func main() {
queue := NewQueue("amqp://guest:guest@localhost:5672/", "hello")
defer queue.Close()
queue.Consume(func(i string) {
log.Printf("Received message with second consumer: %s", i)
})
queue.Consume(func(i string) {
log.Printf("Received message with first consumer: %s", i)
})
for i := 0; i < 100; i++ {
log.Println("Sending message...")
queue.Send(fmt.Sprint("dupa", i))
time.Sleep(500 * time.Millisecond)
}
}
package main
import (
"github.com/streadway/amqp"
"log"
"time"
)
type queue struct {
url string
name string
errorChannel chan *amqp.Error
connection *amqp.Connection
channel *amqp.Channel
closed bool
consumers []messageConsumer
}
type messageConsumer func(string)
func NewQueue(url string, qName string) *queue {
q := new(queue)
q.url = url
q.name = qName
q.consumers = make([]messageConsumer, 0)
q.connect()
go q.reconnector()
return q
}
func (q *queue) Send(message string) {
err := q.channel.Publish(
"", // exchange
q.name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
})
logError("Sending message to queue failed", err)
}
func (q *queue) Consume(consumer messageConsumer) {
log.Println("Registering consumer...")
deliveries, err := q.registerQueueConsumer()
log.Println("Consumer registered! Processing messages...")
q.executeMessageConsumer(err, consumer, deliveries, false)
}
func (q *queue) Close() {
log.Println("Closing connection")
q.closed = true
q.channel.Close()
q.connection.Close()
}
func (q *queue) reconnector() {
for {
err := <-q.errorChannel
if !q.closed {
logError("Reconnecting after connection closed", err)
q.connect()
q.recoverConsumers()
}
}
}
func (q *queue) connect() {
for {
log.Printf("Connecting to rabbitmq on %s\n", q.url)
conn, err := amqp.Dial(q.url)
if err == nil {
q.connection = conn
q.errorChannel = make(chan *amqp.Error)
q.connection.NotifyClose(q.errorChannel)
log.Println("Connection established!")
q.openChannel()
q.declareQueue()
return
}
logError("Connection to rabbitmq failed. Retrying in 1 sec... ", err)
time.Sleep(1000 * time.Millisecond)
}
}
func (q *queue) declareQueue() {
_, err := q.channel.QueueDeclare(
q.name, // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
logError("Queue declaration failed", err)
}
func (q *queue) openChannel() {
channel, err := q.connection.Channel()
logError("Opening channel failed", err)
q.channel = channel
}
func (q *queue) registerQueueConsumer() (<-chan amqp.Delivery, error) {
msgs, err := q.channel.Consume(
q.name, // queue
"", // messageConsumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
logError("Consuming messages from queue failed", err)
return msgs, err
}
func (q *queue) executeMessageConsumer(err error, consumer messageConsumer, deliveries <-chan amqp.Delivery, isRecovery bool) {
if err == nil {
if !isRecovery {
q.consumers = append(q.consumers, consumer)
}
go func() {
for delivery := range deliveries {
consumer(string(delivery.Body[:]))
}
}()
}
}
func (q *queue) recoverConsumers() {
for i := range q.consumers {
var consumer = q.consumers[i]
log.Println("Recovering consumer...")
msgs, err := q.registerQueueConsumer()
log.Println("Consumer recovered! Continuing message processing...")
q.executeMessageConsumer(err, consumer, msgs, true)
}
}
func logError(message string, err error) {
if err != nil {
log.Printf("%s: %s", message, err)
}
}
golang使用rabbitmq路由功能
路由
上一章我们讲的是一个简单的日志系统,把日志广播到每一个接受者。在这一章增加一点功能,为了节省磁盘,只有告警日志和错误日志才进行存储,其他日志就打印到控制台就可以了。
绑定
上一章我们也讲过绑定,绑定是让队列和交换器之间的关系。通俗点说就是让队列只对它所绑定的交换器中的信息感兴趣。
for _ , key := range os. Args[ 1 : ]
log. Printf ( "Binding queue %s to exchange %s with routing key %s" ,
q. Name, "logs_direct" , key)
err = ch. QueueBind ( q. Name, key, "logs_direct" , false , nil )
failOnError ( err, "Failed to bind a queue" )
绑定时的key和交换器类型有关系,如果在fanout交换器中这个值就会被忽略,而在direct交换器中的作用要看下面的内容。
Direct交换器
我们上一篇的日志系统,会把日志发送到所有的消费者。现在扩展以下,把警告和错误日志才存储到文件中,而普通信息打印到控制台即可。
fanout交换器就做不到这么灵活了,这时候就需要direct交换器。direct交换器也很简单就是把发送时带有routing key
的消息发送到通过binding key
绑定到交换器的队列中,这两个值要相同才可以发送。
多绑定
相同binding key
值可以绑定多个队列,一个队列也可以通过多个binding key
值绑定
分发日志
首先创建direct交换器
err = ch. ExchangeDeclare ( "logs_direct" , amqp. ExchangeDirect, true , false , false , false , nil )
failOnError ( err, "Failed to declare an exchange" )
然后往交换器里发送信息
body := bodyFrom ( os. Args)
err = ch. Publish ( "logs_direct" , severityFrom ( os. Args) , false , false , amqp. Publishing
DeliveryMode: amqp. Persistent,
ContentType: "text/plain" ,
Body: [ ] byte ( body) ,
)
failOnError ( err, "Failed to publish a message" )
订阅
现在进行订阅
q, err := ch. QueueDeclare ( "" , false , false , true , false , nil )
failOnError ( err, "Failed to declare a queue" )
if len ( os. Args) < 2
log. Printf ( "Usage: %s [info] [warning] [error]" , os. Args[ 0 ] )
os. Exit ( 0 )
for _ , key := range os. Args[ 1 : ]
log. Printf ( "Binding queue %s to exchange %s with routing key %s" ,
q. Name, "logs_direct" , key)
err = ch. QueueBind ( q. Name, key, "logs_direct" , false , nil )
failOnError ( err, "Failed to bind a queue" )
msgs, err := ch. Consume ( q. Name, "" , true , false , false , false , nil )
failOnError ( err, "Failed to register a consumer" )
以上是关于golang RabbitMQ重新连接功能的示例。包括恢复已注册的消费者。的主要内容,如果未能解决你的问题,请参考以下文章
golang rabbitMQ 生产者复用channel以及生产者组分发策略
golang使用rabbitmq路由功能
golang使用rabbitmq路由功能
golang使用rabbitmq路由功能
RabbitMQ(Golang版本)
RabbitMQ(Golang版本)