golang Golang自动重新连接rabbitmq消费者
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了golang Golang自动重新连接rabbitmq消费者相关的知识,希望对你有一定的参考价值。
package base
import (
"errors"
"fmt"
"github.com/manucorporat/try"
"github.com/simpleton/beego"
"github.com/streadway/amqp"
"math/rand"
"model/helper"
"os"
"runtime"
"time"
"sync/atomic"
)
// Consumer holds all infromation
// about the RabbitMQ connection
// This setup does limit a consumer
// to one exchange. This should not be
// an issue. Having to connect to multiple
// exchanges means something else is
// structured improperly.
type Consumer struct {
conn *amqp.Connection
channel *amqp.Channel
done chan error
consumerTag string // Name that consumer identifies itself to the server with
uri string // uri of the rabbitmq server
exchange string // exchange that we will bind to
exchangeType string // topic, direct, etc...
lastRecoverTime int64
//track service current status
currentStatus atomic.Value
}
const RECOVER_INTERVAL_TIME = 6 * 60
// NewConsumer returns a Consumer struct that has been initialized properly
// essentially don't touch conn, channel, or done and you can create Consumer manually
func newConsumer(consumerTag, uri, exchange, exchangeType string) *Consumer {
name, err := os.Hostname()
if err != nil {
name = "_sim"
}
consumer := &Consumer{
consumerTag: fmt.Sprintf("%s%s", consumerTag, name),
uri: uri,
exchange: exchange,
exchangeType: exchangeType,
done: make(chan error),
lastRecoverTime: time.Now().Unix(),
}
consumer.currentStatus.Store(true)
return consumer
}
func maxParallelism() int {
maxProcs := runtime.GOMAXPROCS(0)
numCPU := runtime.NumCPU()
if maxProcs < numCPU {
return maxProcs
}
return numCPU
}
func RunConsumer(consumerTag, exchange, exchangeType, queueName, routingKey string, handler func([]byte) bool) {
rabbitUri := fmt.Sprintf("amqp://%s:%s@%s/",
beego.AppConfig.String("mqAccount"),
beego.AppConfig.String("mqPassword"),
beego.AppConfig.String("mqAddress"),
)
consumer := newConsumer(consumerTag, rabbitUri, exchange, exchangeType)
if err := consumer.Connect(); err != nil {
helper.FailOnError(err, fmt.Sprintf("[%s]connect error", consumerTag))
}
deliveries, err := consumer.AnnounceQueue(queueName, routingKey)
helper.FailOnError(err, fmt.Sprintf("[%s]Error when calling AnnounceQueue()", consumerTag))
consumer.Handle(deliveries, handler, maxParallelism(), queueName, routingKey)
}
// ReConnect is called in places where NotifyClose() channel is called
// wait 30 seconds before trying to reconnect. Any shorter amount of time
// will likely destroy the error log while waiting for servers to come
// back online. This requires two parameters which is just to satisfy
// the AccounceQueue call and allows greater flexability
func (c *Consumer) ReConnect(queueName, routingKey string, retryTime int) (<-chan amqp.Delivery, error) {
c.Close()
time.Sleep(time.Duration(15 + rand.Intn(60) + 2*retryTime) * time.Second)
beego.Info("Try ReConnect with times:", retryTime)
if err := c.Connect(); err != nil {
return nil, err
}
deliveries, err := c.AnnounceQueue(queueName, routingKey)
if err != nil {
return deliveries, errors.New("Couldn't connect")
}
return deliveries, nil
}
// Connect to RabbitMQ server
func (c *Consumer) Connect() error {
var err error
beego.Info("dialing: ", c.uri)
c.conn, err = amqp.Dial(c.uri)
if err != nil {
return fmt.Errorf("Dial: %s", err)
}
go func() {
// Waits here for the channel to be closed
beego.Info("closing: ", <-c.conn.NotifyClose(make(chan *amqp.Error)))
// Let Handle know it's not time to reconnect
c.done <- errors.New("Channel Closed")
}()
beego.Info("got Connection, getting Channel")
c.channel, err = c.conn.Channel()
if err != nil {
return fmt.Errorf("Channel: %s", err)
}
beego.Info("got Channel, declaring Exchange ", c.exchange)
if err = c.channel.ExchangeDeclare(
c.exchange, // name of the exchange
c.exchangeType, // type
true, // durable
false, // delete when complete
false, // internal
false, // noWait
nil, // arguments
); err != nil {
return fmt.Errorf("Exchange Declare: %s", err)
}
return nil
}
// AnnounceQueue sets the queue that will be listened to for this
// connection...
func (c *Consumer) AnnounceQueue(queueName, routingKey string) (<-chan amqp.Delivery, error) {
beego.Info("declared Exchange, declaring Queue:", queueName)
queue, err := c.channel.QueueDeclare(
queueName, // name of the queue
true, // durable
false, // delete when usused
false, // exclusive
false, // noWait
nil, // arguments
)
if err != nil {
return nil, fmt.Errorf("Queue Declare: %s", err)
}
beego.Info(fmt.Sprintf("declared Queue (%q %d messages, %d consumers), binding to Exchange (key %q)",
queue.Name, queue.Messages, queue.Consumers, routingKey))
// Qos determines the amount of messages that the queue will pass to you before
// it waits for you to ack them. This will slow down queue consumption but
// give you more certainty that all messages are being processed. As load increases
// I would reccomend upping the about of Threads and Processors the go process
// uses before changing this although you will eventually need to reach some
// balance between threads, procs, and Qos.
err = c.channel.Qos(50, 0, false)
if err != nil {
return nil, fmt.Errorf("Error setting qos: %s", err)
}
if err = c.channel.QueueBind(
queue.Name, // name of the queue
routingKey, // routingKey
c.exchange, // sourceExchange
false, // noWait
nil, // arguments
); err != nil {
return nil, fmt.Errorf("Queue Bind: %s", err)
}
beego.Info("Queue bound to Exchange, starting Consume consumer tag:", c.consumerTag)
deliveries, err := c.channel.Consume(
queue.Name, // name
c.consumerTag, // consumerTag,
false, // noAck
false, // exclusive
false, // noLocal
false, // noWait
nil, // arguments
)
if err != nil {
return nil, fmt.Errorf("Queue Consume: %s", err)
}
return deliveries, nil
}
func (c *Consumer) Close() {
if c.channel != nil {
c.channel.Close()
c.channel = nil
}
if c.conn != nil {
c.conn.Close()
c.conn = nil
}
}
func (c *Consumer) Handle(
deliveries <-chan amqp.Delivery,
fn func([]byte) bool,
threads int,
queue string,
routingKey string) {
var err error
for {
beego.Info("Enter for busy loop with thread:", threads)
for i := 0; i < threads; i++ {
go func() {
beego.Info("Enter go with thread with deliveries", deliveries)
for msg := range deliveries {
beego.Info("Enter deliver")
ret := false
try.This(func() {
body := msg.Body[:]
ret = fn(body)
}).Finally(func() {
if ret == true {
msg.Ack(false)
currentTime := time.Now().Unix()
if currentTime-c.lastRecoverTime > RECOVER_INTERVAL_TIME && !c.currentStatus.Load().(bool) {
beego.Info("Try to Recover Unack Messages!")
c.currentStatus.Store(true)
c.lastRecoverTime = currentTime
c.channel.Recover(true)
}
} else {
// this really a litter dangerous. if the worker is panic very quickly,
// it will ddos our sentry server......plz, add [retry-ttl] in header.
//msg.Nack(false, true)
c.currentStatus.Store(false)
}
}).Catch(func(e try.E) {
helper.SentryError(e)
})
}
}()
}
// Go into reconnect loop when
// c.done is passed non nil values
if <-c.done != nil {
c.currentStatus.Store(false)
retryTime := 1
for {
deliveries, err = c.ReConnect(queue, routingKey, retryTime)
if err != nil {
helper.FailOnError(err, "Reconnecting Error")
retryTime += 1
} else {
break
}
}
}
beego.Info("Reconnected!!!")
}
}
golang RabbitMQ重新连接功能的示例。包括恢复已注册的消费者。
package main
import (
"fmt"
"log"
"time"
)
func main() {
queue := NewQueue("amqp://guest:guest@localhost:5672/", "hello")
defer queue.Close()
queue.Consume(func(i string) {
log.Printf("Received message with second consumer: %s", i)
})
queue.Consume(func(i string) {
log.Printf("Received message with first consumer: %s", i)
})
for i := 0; i < 100; i++ {
log.Println("Sending message...")
queue.Send(fmt.Sprint("dupa", i))
time.Sleep(500 * time.Millisecond)
}
}
package main
import (
"github.com/streadway/amqp"
"log"
"time"
)
type queue struct {
url string
name string
errorChannel chan *amqp.Error
connection *amqp.Connection
channel *amqp.Channel
closed bool
consumers []messageConsumer
}
type messageConsumer func(string)
func NewQueue(url string, qName string) *queue {
q := new(queue)
q.url = url
q.name = qName
q.consumers = make([]messageConsumer, 0)
q.connect()
go q.reconnector()
return q
}
func (q *queue) Send(message string) {
err := q.channel.Publish(
"", // exchange
q.name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
})
logError("Sending message to queue failed", err)
}
func (q *queue) Consume(consumer messageConsumer) {
log.Println("Registering consumer...")
deliveries, err := q.registerQueueConsumer()
log.Println("Consumer registered! Processing messages...")
q.executeMessageConsumer(err, consumer, deliveries, false)
}
func (q *queue) Close() {
log.Println("Closing connection")
q.closed = true
q.channel.Close()
q.connection.Close()
}
func (q *queue) reconnector() {
for {
err := <-q.errorChannel
if !q.closed {
logError("Reconnecting after connection closed", err)
q.connect()
q.recoverConsumers()
}
}
}
func (q *queue) connect() {
for {
log.Printf("Connecting to rabbitmq on %s\n", q.url)
conn, err := amqp.Dial(q.url)
if err == nil {
q.connection = conn
q.errorChannel = make(chan *amqp.Error)
q.connection.NotifyClose(q.errorChannel)
log.Println("Connection established!")
q.openChannel()
q.declareQueue()
return
}
logError("Connection to rabbitmq failed. Retrying in 1 sec... ", err)
time.Sleep(1000 * time.Millisecond)
}
}
func (q *queue) declareQueue() {
_, err := q.channel.QueueDeclare(
q.name, // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
logError("Queue declaration failed", err)
}
func (q *queue) openChannel() {
channel, err := q.connection.Channel()
logError("Opening channel failed", err)
q.channel = channel
}
func (q *queue) registerQueueConsumer() (<-chan amqp.Delivery, error) {
msgs, err := q.channel.Consume(
q.name, // queue
"", // messageConsumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
logError("Consuming messages from queue failed", err)
return msgs, err
}
func (q *queue) executeMessageConsumer(err error, consumer messageConsumer, deliveries <-chan amqp.Delivery, isRecovery bool) {
if err == nil {
if !isRecovery {
q.consumers = append(q.consumers, consumer)
}
go func() {
for delivery := range deliveries {
consumer(string(delivery.Body[:]))
}
}()
}
}
func (q *queue) recoverConsumers() {
for i := range q.consumers {
var consumer = q.consumers[i]
log.Println("Recovering consumer...")
msgs, err := q.registerQueueConsumer()
log.Println("Consumer recovered! Continuing message processing...")
q.executeMessageConsumer(err, consumer, msgs, true)
}
}
func logError(message string, err error) {
if err != nil {
log.Printf("%s: %s", message, err)
}
}
以上是关于golang Golang自动重新连接rabbitmq消费者的主要内容,如果未能解决你的问题,请参考以下文章
golang 连接、操作完mysql, 对mysql的连接会自动关闭,还是必须要手动关闭?
golang RabbitMQ重新连接功能的示例。包括恢复已注册的消费者。
解决golang使用elastic连接elasticsearch时自动转换连接地址
golang 如何连接redis --- 2022-04-03