golang RabbitMQ重新连接功能的示例。包括恢复已注册的消费者。

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了golang RabbitMQ重新连接功能的示例。包括恢复已注册的消费者。相关的知识,希望对你有一定的参考价值。

package main

import (
	"fmt"
	"log"
	"time"
)

func main() {
	queue := NewQueue("amqp://guest:guest@localhost:5672/", "hello")
	defer queue.Close()

	queue.Consume(func(i string) {
		log.Printf("Received message with second consumer: %s", i)
	})

	queue.Consume(func(i string) {
		log.Printf("Received message with first consumer: %s", i)
	})

	for i := 0; i < 100; i++ {
		log.Println("Sending message...")
		queue.Send(fmt.Sprint("dupa", i))
		time.Sleep(500 * time.Millisecond)
	}
}
package main

import (
	"github.com/streadway/amqp"
	"log"
	"time"
)

type queue struct {
	url  string
	name string

	errorChannel chan *amqp.Error
	connection   *amqp.Connection
	channel      *amqp.Channel
	closed       bool

	consumers []messageConsumer
}

type messageConsumer func(string)

func NewQueue(url string, qName string) *queue {
	q := new(queue)
	q.url = url
	q.name = qName
	q.consumers = make([]messageConsumer, 0)

	q.connect()
	go q.reconnector()

	return q
}

func (q *queue) Send(message string) {
	err := q.channel.Publish(
		"",     // exchange
		q.name, // routing key
		false,  // mandatory
		false,  // immediate
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(message),
		})
	logError("Sending message to queue failed", err)
}

func (q *queue) Consume(consumer messageConsumer) {
	log.Println("Registering consumer...")
	deliveries, err := q.registerQueueConsumer()
	log.Println("Consumer registered! Processing messages...")
	q.executeMessageConsumer(err, consumer, deliveries, false)
}

func (q *queue) Close() {
	log.Println("Closing connection")
	q.closed = true
	q.channel.Close()
	q.connection.Close()
}

func (q *queue) reconnector() {
	for {
		err := <-q.errorChannel
		if !q.closed {
			logError("Reconnecting after connection closed", err)

			q.connect()
			q.recoverConsumers()
		}
	}
}

func (q *queue) connect() {
	for {
		log.Printf("Connecting to rabbitmq on %s\n", q.url)
		conn, err := amqp.Dial(q.url)
		if err == nil {
			q.connection = conn
			q.errorChannel = make(chan *amqp.Error)
			q.connection.NotifyClose(q.errorChannel)

			log.Println("Connection established!")

			q.openChannel()
			q.declareQueue()

			return
		}

		logError("Connection to rabbitmq failed. Retrying in 1 sec... ", err)
		time.Sleep(1000 * time.Millisecond)
	}
}

func (q *queue) declareQueue() {
	_, err := q.channel.QueueDeclare(
		q.name, // name
		false,  // durable
		false,  // delete when unused
		false,  // exclusive
		false,  // no-wait
		nil,    // arguments
	)
	logError("Queue declaration failed", err)
}

func (q *queue) openChannel() {
	channel, err := q.connection.Channel()
	logError("Opening channel failed", err)
	q.channel = channel
}

func (q *queue) registerQueueConsumer() (<-chan amqp.Delivery, error) {
	msgs, err := q.channel.Consume(
		q.name, // queue
		"",     // messageConsumer
		true,   // auto-ack
		false,  // exclusive
		false,  // no-local
		false,  // no-wait
		nil,    // args
	)
	logError("Consuming messages from queue failed", err)
	return msgs, err
}

func (q *queue) executeMessageConsumer(err error, consumer messageConsumer, deliveries <-chan amqp.Delivery, isRecovery bool) {
	if err == nil {
		if !isRecovery {
			q.consumers = append(q.consumers, consumer)
		}
		go func() {
			for delivery := range deliveries {
				consumer(string(delivery.Body[:]))
			}
		}()
	}
}

func (q *queue) recoverConsumers() {
	for i := range q.consumers {
		var consumer = q.consumers[i]

		log.Println("Recovering consumer...")
		msgs, err := q.registerQueueConsumer()
		log.Println("Consumer recovered! Continuing message processing...")
		q.executeMessageConsumer(err, consumer, msgs, true)
	}
}

func logError(message string, err error) {
	if err != nil {
		log.Printf("%s: %s", message, err)
	}
}

golang使用rabbitmq路由功能

路由

上一章我们讲的是一个简单的日志系统,把日志广播到每一个接受者。在这一章增加一点功能,为了节省磁盘,只有告警日志和错误日志才进行存储,其他日志就打印到控制台就可以了。

绑定

上一章我们也讲过绑定,绑定是让队列和交换器之间的关系。通俗点说就是让队列只对它所绑定的交换器中的信息感兴趣。

for _, key := range os.Args[1:] 
		log.Printf("Binding queue %s to exchange %s with routing key %s",
			q.Name, "logs_direct", key)
		err = ch.QueueBind(q.Name, key, "logs_direct", false, nil)
		failOnError(err, "Failed to bind a queue")
	

绑定时的key和交换器类型有关系,如果在fanout交换器中这个值就会被忽略,而在direct交换器中的作用要看下面的内容。

Direct交换器

我们上一篇的日志系统,会把日志发送到所有的消费者。现在扩展以下,把警告和错误日志才存储到文件中,而普通信息打印到控制台即可。

fanout交换器就做不到这么灵活了,这时候就需要direct交换器。direct交换器也很简单就是把发送时带有routing key的消息发送到通过binding key绑定到交换器的队列中,这两个值要相同才可以发送。

多绑定

相同binding key值可以绑定多个队列,一个队列也可以通过多个binding key值绑定

分发日志

首先创建direct交换器

err = ch.ExchangeDeclare("logs_direct", amqp.ExchangeDirect, true, false, false, false, nil)
	failOnError(err, "Failed to declare an exchange")

然后往交换器里发送信息

body := bodyFrom(os.Args)
	err = ch.Publish("logs_direct", severityFrom(os.Args), false, false, amqp.Publishing
		DeliveryMode: amqp.Persistent,
		ContentType:  "text/plain",
		Body:         []byte(body),
	)
	failOnError(err, "Failed to publish a message")

订阅

现在进行订阅


	q, err := ch.QueueDeclare("", false, false, true, false, nil)
	failOnError(err, "Failed to declare a queue")

	if len(os.Args) < 2 
		log.Printf("Usage: %s [info] [warning] [error]", os.Args[0])
		os.Exit(0)
	

	for _, key := range os.Args[1:] 
		log.Printf("Binding queue %s to exchange %s with routing key %s",
			q.Name, "logs_direct", key)
		err = ch.QueueBind(q.Name, key, "logs_direct", false, nil)
		failOnError(err, "Failed to bind a queue")
	

	msgs, err := ch.Consume(q.Name, "", true, false, false, false, nil)
	failOnError(err, "Failed to register a consumer")

以上是关于golang RabbitMQ重新连接功能的示例。包括恢复已注册的消费者。的主要内容,如果未能解决你的问题,请参考以下文章

golang rabbitMQ 生产者复用channel以及生产者组分发策略

golang使用rabbitmq路由功能

golang使用rabbitmq路由功能

golang使用rabbitmq路由功能

RabbitMQ(Golang版本)

RabbitMQ(Golang版本)