golang Golang自动重新连接rabbitmq消费者

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了golang Golang自动重新连接rabbitmq消费者相关的知识,希望对你有一定的参考价值。

package base

import (
    "errors"
    "fmt"
    "github.com/manucorporat/try"
    "github.com/simpleton/beego"
    "github.com/streadway/amqp"
    "math/rand"
    "model/helper"
    "os"
    "runtime"
    "time"
    "sync/atomic"
)

// Consumer holds all infromation
// about the RabbitMQ connection
// This setup does limit a consumer
// to one exchange. This should not be
// an issue. Having to connect to multiple
// exchanges means something else is
// structured improperly.
type Consumer struct {
    conn         *amqp.Connection
    channel      *amqp.Channel
    done         chan error
    consumerTag  string // Name that consumer identifies itself to the server with
    uri          string // uri of the rabbitmq server
    exchange     string // exchange that we will bind to
    exchangeType string // topic, direct, etc...

    lastRecoverTime int64
    //track service current status
    currentStatus   atomic.Value
}

const RECOVER_INTERVAL_TIME = 6 * 60

// NewConsumer returns a Consumer struct that has been initialized properly
// essentially don't touch conn, channel, or done and you can create Consumer manually
func newConsumer(consumerTag, uri, exchange, exchangeType string) *Consumer {
    name, err := os.Hostname()
    if err != nil {
        name = "_sim"
    }
    consumer := &Consumer{
        consumerTag:     fmt.Sprintf("%s%s", consumerTag, name),
        uri:             uri,
        exchange:        exchange,
        exchangeType:    exchangeType,
        done:            make(chan error),
        lastRecoverTime: time.Now().Unix(),
    }
    consumer.currentStatus.Store(true)
    return consumer
}

func maxParallelism() int {
    maxProcs := runtime.GOMAXPROCS(0)
    numCPU := runtime.NumCPU()
    if maxProcs < numCPU {
        return maxProcs
    }
    return numCPU
}

func RunConsumer(consumerTag, exchange, exchangeType, queueName, routingKey string, handler func([]byte) bool) {

    rabbitUri := fmt.Sprintf("amqp://%s:%s@%s/",
        beego.AppConfig.String("mqAccount"),
        beego.AppConfig.String("mqPassword"),
        beego.AppConfig.String("mqAddress"),
    )
    consumer := newConsumer(consumerTag, rabbitUri, exchange, exchangeType)

    if err := consumer.Connect(); err != nil {
        helper.FailOnError(err, fmt.Sprintf("[%s]connect error", consumerTag))
    }

    deliveries, err := consumer.AnnounceQueue(queueName, routingKey)
    helper.FailOnError(err, fmt.Sprintf("[%s]Error when calling AnnounceQueue()", consumerTag))
    consumer.Handle(deliveries, handler, maxParallelism(), queueName, routingKey)
}

// ReConnect is called in places where NotifyClose() channel is called
// wait 30 seconds before trying to reconnect. Any shorter amount of time
// will  likely destroy the error log while waiting for servers to come
// back online. This requires two parameters which is just to satisfy
// the AccounceQueue call and allows greater flexability
func (c *Consumer) ReConnect(queueName, routingKey string, retryTime int) (<-chan amqp.Delivery, error) {
    c.Close()
    time.Sleep(time.Duration(15 + rand.Intn(60) + 2*retryTime) * time.Second)
    beego.Info("Try ReConnect with times:", retryTime)

    if err := c.Connect(); err != nil {
        return nil, err
    }

    deliveries, err := c.AnnounceQueue(queueName, routingKey)
    if err != nil {
        return deliveries, errors.New("Couldn't connect")
    }
    return deliveries, nil
}

// Connect to RabbitMQ server
func (c *Consumer) Connect() error {

    var err error
    beego.Info("dialing: ", c.uri)
    c.conn, err = amqp.Dial(c.uri)

    if err != nil {
        return fmt.Errorf("Dial: %s", err)
    }

    go func() {
        // Waits here for the channel to be closed
        beego.Info("closing: ", <-c.conn.NotifyClose(make(chan *amqp.Error)))
        // Let Handle know it's not time to reconnect
        c.done <- errors.New("Channel Closed")
    }()

    beego.Info("got Connection, getting Channel")
    c.channel, err = c.conn.Channel()
    if err != nil {
        return fmt.Errorf("Channel: %s", err)
    }

    beego.Info("got Channel, declaring Exchange ", c.exchange)
    if err = c.channel.ExchangeDeclare(
        c.exchange,     // name of the exchange
        c.exchangeType, // type
        true,           // durable
        false,          // delete when complete
        false,          // internal
        false,          // noWait
        nil,            // arguments
    ); err != nil {
        return fmt.Errorf("Exchange Declare: %s", err)
    }

    return nil
}

// AnnounceQueue sets the queue that will be listened to for this
// connection...
func (c *Consumer) AnnounceQueue(queueName, routingKey string) (<-chan amqp.Delivery, error) {
    beego.Info("declared Exchange, declaring Queue:", queueName)
    queue, err := c.channel.QueueDeclare(
        queueName, // name of the queue
        true,      // durable
        false,     // delete when usused
        false,     // exclusive
        false,     // noWait
        nil,       // arguments
    )

    if err != nil {
        return nil, fmt.Errorf("Queue Declare: %s", err)
    }

    beego.Info(fmt.Sprintf("declared Queue (%q %d messages, %d consumers), binding to Exchange (key %q)",
        queue.Name, queue.Messages, queue.Consumers, routingKey))

    // Qos determines the amount of messages that the queue will pass to you before
    // it waits for you to ack them. This will slow down queue consumption but
    // give you more certainty that all messages are being processed. As load increases
    // I would reccomend upping the about of Threads and Processors the go process
    // uses before changing this although you will eventually need to reach some
    // balance between threads, procs, and Qos.
    err = c.channel.Qos(50, 0, false)
    if err != nil {
        return nil, fmt.Errorf("Error setting qos: %s", err)
    }

    if err = c.channel.QueueBind(
        queue.Name, // name of the queue
        routingKey, // routingKey
        c.exchange, // sourceExchange
        false,      // noWait
        nil,        // arguments
    ); err != nil {
        return nil, fmt.Errorf("Queue Bind: %s", err)
    }

    beego.Info("Queue bound to Exchange, starting Consume consumer tag:", c.consumerTag)
    deliveries, err := c.channel.Consume(
        queue.Name,    // name
        c.consumerTag, // consumerTag,
        false,         // noAck
        false,         // exclusive
        false,         // noLocal
        false,         // noWait
        nil,           // arguments
    )
    if err != nil {
        return nil, fmt.Errorf("Queue Consume: %s", err)
    }
    return deliveries, nil
}

func (c *Consumer) Close() {
    if c.channel != nil {
        c.channel.Close()
        c.channel = nil
    }
    if c.conn != nil {
        c.conn.Close()
        c.conn = nil
    }
}

func (c *Consumer) Handle(
    deliveries <-chan amqp.Delivery,
    fn func([]byte) bool,
    threads int,
    queue string,
    routingKey string) {

    var err error
    for {
        beego.Info("Enter for busy loop with thread:", threads)
        for i := 0; i < threads; i++ {
            go func() {
                beego.Info("Enter go with thread with deliveries", deliveries)
                for msg := range deliveries {
                    beego.Info("Enter deliver")
                    ret := false
                    try.This(func() {
                        body := msg.Body[:]
                        ret = fn(body)
                    }).Finally(func() {
                        if ret == true {
                            msg.Ack(false)
                            currentTime := time.Now().Unix()
                            if currentTime-c.lastRecoverTime > RECOVER_INTERVAL_TIME && !c.currentStatus.Load().(bool) {
                                beego.Info("Try to Recover Unack Messages!")
                                c.currentStatus.Store(true)
                                c.lastRecoverTime = currentTime
                                c.channel.Recover(true)
                            }
                        } else {
                            // this really a litter dangerous. if the worker is panic very quickly,
                            // it will ddos our sentry server......plz, add [retry-ttl] in header.
                            //msg.Nack(false, true)
                            c.currentStatus.Store(false)
                        }
                    }).Catch(func(e try.E) {
                        helper.SentryError(e)
                    })
                }
            }()
        }

        // Go into reconnect loop when
        // c.done is passed non nil values
        if <-c.done != nil {
            c.currentStatus.Store(false)
            retryTime := 1
            for {
                deliveries, err = c.ReConnect(queue, routingKey, retryTime)
                if err != nil {
                    helper.FailOnError(err, "Reconnecting Error")
                    retryTime += 1
                } else {
                    break
                }
            }
        }
        beego.Info("Reconnected!!!")
    }
}

golang RabbitMQ重新连接功能的示例。包括恢复已注册的消费者。

package main

import (
	"fmt"
	"log"
	"time"
)

func main() {
	queue := NewQueue("amqp://guest:guest@localhost:5672/", "hello")
	defer queue.Close()

	queue.Consume(func(i string) {
		log.Printf("Received message with second consumer: %s", i)
	})

	queue.Consume(func(i string) {
		log.Printf("Received message with first consumer: %s", i)
	})

	for i := 0; i < 100; i++ {
		log.Println("Sending message...")
		queue.Send(fmt.Sprint("dupa", i))
		time.Sleep(500 * time.Millisecond)
	}
}
package main

import (
	"github.com/streadway/amqp"
	"log"
	"time"
)

type queue struct {
	url  string
	name string

	errorChannel chan *amqp.Error
	connection   *amqp.Connection
	channel      *amqp.Channel
	closed       bool

	consumers []messageConsumer
}

type messageConsumer func(string)

func NewQueue(url string, qName string) *queue {
	q := new(queue)
	q.url = url
	q.name = qName
	q.consumers = make([]messageConsumer, 0)

	q.connect()
	go q.reconnector()

	return q
}

func (q *queue) Send(message string) {
	err := q.channel.Publish(
		"",     // exchange
		q.name, // routing key
		false,  // mandatory
		false,  // immediate
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(message),
		})
	logError("Sending message to queue failed", err)
}

func (q *queue) Consume(consumer messageConsumer) {
	log.Println("Registering consumer...")
	deliveries, err := q.registerQueueConsumer()
	log.Println("Consumer registered! Processing messages...")
	q.executeMessageConsumer(err, consumer, deliveries, false)
}

func (q *queue) Close() {
	log.Println("Closing connection")
	q.closed = true
	q.channel.Close()
	q.connection.Close()
}

func (q *queue) reconnector() {
	for {
		err := <-q.errorChannel
		if !q.closed {
			logError("Reconnecting after connection closed", err)

			q.connect()
			q.recoverConsumers()
		}
	}
}

func (q *queue) connect() {
	for {
		log.Printf("Connecting to rabbitmq on %s\n", q.url)
		conn, err := amqp.Dial(q.url)
		if err == nil {
			q.connection = conn
			q.errorChannel = make(chan *amqp.Error)
			q.connection.NotifyClose(q.errorChannel)

			log.Println("Connection established!")

			q.openChannel()
			q.declareQueue()

			return
		}

		logError("Connection to rabbitmq failed. Retrying in 1 sec... ", err)
		time.Sleep(1000 * time.Millisecond)
	}
}

func (q *queue) declareQueue() {
	_, err := q.channel.QueueDeclare(
		q.name, // name
		false,  // durable
		false,  // delete when unused
		false,  // exclusive
		false,  // no-wait
		nil,    // arguments
	)
	logError("Queue declaration failed", err)
}

func (q *queue) openChannel() {
	channel, err := q.connection.Channel()
	logError("Opening channel failed", err)
	q.channel = channel
}

func (q *queue) registerQueueConsumer() (<-chan amqp.Delivery, error) {
	msgs, err := q.channel.Consume(
		q.name, // queue
		"",     // messageConsumer
		true,   // auto-ack
		false,  // exclusive
		false,  // no-local
		false,  // no-wait
		nil,    // args
	)
	logError("Consuming messages from queue failed", err)
	return msgs, err
}

func (q *queue) executeMessageConsumer(err error, consumer messageConsumer, deliveries <-chan amqp.Delivery, isRecovery bool) {
	if err == nil {
		if !isRecovery {
			q.consumers = append(q.consumers, consumer)
		}
		go func() {
			for delivery := range deliveries {
				consumer(string(delivery.Body[:]))
			}
		}()
	}
}

func (q *queue) recoverConsumers() {
	for i := range q.consumers {
		var consumer = q.consumers[i]

		log.Println("Recovering consumer...")
		msgs, err := q.registerQueueConsumer()
		log.Println("Consumer recovered! Continuing message processing...")
		q.executeMessageConsumer(err, consumer, msgs, true)
	}
}

func logError(message string, err error) {
	if err != nil {
		log.Printf("%s: %s", message, err)
	}
}

以上是关于golang Golang自动重新连接rabbitmq消费者的主要内容,如果未能解决你的问题,请参考以下文章

golang 连接、操作完mysql, 对mysql的连接会自动关闭,还是必须要手动关闭?

golang RabbitMQ重新连接功能的示例。包括恢复已注册的消费者。

解决golang使用elastic连接elasticsearch时自动转换连接地址

golang 如何连接redis --- 2022-04-03

解决golang使用elastic连接elasticsearch时自动转换连接地址

解决golang使用elastic连接elasticsearch时自动转换连接地址