RabbitMQ—基础客户端开发

Posted 与昊

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ—基础客户端开发相关的知识,希望对你有一定的参考价值。

本文以Go语言为例,来讲解RabbitMQ的客户端开发要点。首先执行命令安装amqp依赖包:

go get github.com/rabbitmq/amqp091-go

连接RabbitMQ

建立连接

func Dial(url string) (*Connection, error)
  • url:RabbitMQ服务端url,例如:amqp://admin:admin@127.0.0.1:5672/

创建信道

func (c *Connection) Channel() (*Channel, error)

Connection可以用来创建多个Channel实例,但是Channel实例不能在线程间共享,应用程序应该为每一个线程开辟一个Channel。某些情况下Channel的操作可以并发运行,但是在其他情况下会导致通信错误,同时也会影响发送方确认(publisher confirm)机制的运行,所以多线程间共享Channel实例是非线程安全的。

使用交换器和队列

声明交换器

func (ch *Channel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args Table) error
  • name:交换器的名称。
  • kind:交换器的类型,常见的如 fanout、direct、topic。
  • durable:设置是否持久化。设置为持久化可以将交换器存盘,在服务器重启的时候不会丢失相关信息。
  • autoDelete:设置是否自动删除。自动删除的前提是至少有一个队列或者交换器与这个交换器绑定,当所有与这个交换器绑定的队列或者交换器都与此解绑时,会自动删除该交换器。注意不能错误地把这个参数理解为“当与此交换器连接的客户端都断开时,RabbitMQ会自动删除本交换器”。
  • internal:设置是否是内置交换器。如果设置为true,则表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式。
  • noWait:是否非阻塞等待服务器返回。设置为true则不会阻塞等待RabbitMQ Server的返回信息(实际上服务器也不会返回),此时立即使用这个交换器可能会导致异常,建议设置为false。
  • args:其他一些结构化参数,比如alternate-exchange等。

还有另一个类似的参数完全一样的方法ExchangeDeclarePassive,这个方法主要用来检测相应的交换器是否存在,如果存在则正常返回,不存在则抛出异常,同时Channel也会被关闭。

删除交换器

func (ch *Channel) ExchangeDelete(name string, ifUnused, noWait bool) error
  • name:交换器名称。
  • ifUnused:设置是否在交换器没有被使用的情况下删除。如果设置为true,则只有在此交换器没有被使用的情况下才会被删除;如果设置为false,则无论如何这个交换器都要被删除。
  • noWait:是否非阻塞等待服务器返回,建议设置为false。

声明队列

func (ch *Channel) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args Table) (Queue, error)
  • name:队列的名称。
  • durable:设置是否持久化。持久化的队列会存盘,在服务器重启的时候可以保证不丢失队列及消息(不丢失是相对的,如果宕机时有消息没来得及存盘,还是会丢失)。
  • autoDelete:设置是否自动删除。自动删除的前提是至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。不能把这个参数错误地理解为“当连接到此队列的所有客户端断开时,这个队列会自动删除”,因为生产者客户端创建这个队列,或者没有消费者客户端与这个队列连接时,都不会自动删除这个队列。
  • exclusive:设置是否排他。如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。有几点需要注意:排他队列是基于连接可见的,同一个连接的不同信道是可以同时访问同一连接创建的排他队列;如果一个连接已经声明了一个排他队列,其他连接不允许建立同名的排他队列;即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除。这种队列适用于一个客户端同时发送和读取消息的应用场景。
  • noWait:是否非阻塞等待服务器返回,建议设置为false。
  • args:设置队列的其他一些参数,如x-message-ttl、x-expires等。

生产者和消费者都能够使用queueDeclare来声明一个队列,但是如果消费者在同一个信道上订阅了另一个队列,就无法再声明队列了。必须先取消订阅,然后将信道置为“传输”模式,之后才能声明队列。

同样这里还有一个参数完全一样的queueDeclarePassive方法,这个方法用来检测相应的队列是否存在。如果存在则正常返回,如果不存在则抛出异常,同时Channel也会被关闭。

删除队列

func (ch *Channel) QueueDelete(name string, ifUnused, ifEmpty, noWait bool) (int, error)
  • name:队列名称。
  • ifUnused:设置是否在队列没有被使用的情况下删除。
  • ifEmpty:设置是否在队列为空(没有任何消息堆积)的情况下才能够删除。
  • noWait:是否非阻塞等待服务器返回,建议设置为false。

清空队列

func (ch *Channel) QueuePurge(name string, noWait bool) (int, error)
  • name:队列名称。
  • noWait:是否非阻塞等待服务器返回,建议设置为false。

队列绑定

func (ch *Channel) QueueBind(name, key, exchange string, noWait bool, args Table) error
  • name:队列名称。
  • key:用来绑定队列和交换器的键。
  • exchange:交换器名称。
  • noWait:是否非阻塞等待服务器返回,建议设置为false。
  • args:定义绑定的一些参数。

队列解绑

func (ch *Channel) QueueUnbind(name, key, exchange string, args Table) error
  • name:队列名称。
  • key:用来绑定队列和交换器的键。
  • exchange:交换器名称。
  • args:定义解绑的一些参数。

交换器绑定

func (ch *Channel) ExchangeBind(destination, key, source string, noWait bool, args Table) error
  • destination:目的交换器,通常是内部交换器。
  • key:用来绑定源交换器和目的交换器的键。
  • source:源交换器。
  • nowait:是否非阻塞等待服务器返回,建议设置为false。
  • args:定义绑定的一些参数。

生产者发送消息至源交换器中,源交换器根据路由键找到与其匹配的目的交换器,并将消息转发到给目的交换器,进而存储在目的交换器绑定的队列中。

发送消息

func (ch *Channel) Publish(exchange, key string, mandatory, immediate bool, msg Publishing) error
  • exchange:交换机名称,指明消息需要发送到哪个交换器中。如果设置为空字符串,则消息会被发送到默认交换器中。
  • key:路由键,交换器根据路由键将消息存储到相应的队列中。
  • mandatory:建议为false,后面有专门章节讲解。
  • immediate :建议为false,后面有专门章节讲解。
  • msg:要发送的消息,msg对应一个Publishing结构,Publishing结构里面有很多属性,除了注释的几个之外,大多很少使用。
type Publishing struct {
    // headers类型交换器会使用
    Headers Table

        // 属性
    ContentType     string    // 消息类型
    ContentEncoding string    // 消息编码
    DeliveryMode    uint8     // 是否持久化,0或1非持久化,2持久化
    Priority        uint8     // 优先级,0 - 9
    CorrelationId   string    // 关联id,有助于将RPC响应与请求相关联
    ReplyTo         string    // RPC响应的回调地址,常用于命名回调队列
    Expiration      string    
    MessageId       string    
    Timestamp       time.Time 
    Type            string    
    UserId          string    
    AppId           string    

    // 消息体
    Body []byte
}

消费消息

RabbitMQ的消费模式分两种:推(Push)模式和拉(Pull)模式。

推模式

在推模式中,可以通过持续订阅的方式来消费消息。

func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error)
  • queue:队列名称。
  • consumer:消费者标签,用于区分不同的消费者。
  • autoAck:设置是否自动确认,建议设置成false。
  • exclusive:设置是否排他,排他表示当前队列只能给一个消费者使用。
  • noLocal:设置为true则表示不能将同一个Connection中生产者发送的消息传送给这Connection中的消费者。
  • nowait:是否非阻塞等待服务器返回,建议设置为false。
  • args:设置消费者的其他参数。

此函数返回一个单向Delivery类型通道,遍历该通道,有消息则进行处理,没有则阻塞。如果需要取消订阅的话,可以调用Cancel方法:

func (ch *Channel) Cancel(consumer string, noWait bool) error
  • consumer:消费者标签,用于区分不同的消费者。
  • nowait:是否非阻塞等待服务器返回,建议设置为false。

拉模式

func (ch *Channel) Get(queue string, autoAck bool) (msg Delivery, ok bool, err error)
  • queue:队列名称。
  • autoAck:设置是否自动确认,建议设置成false。

推模式下将信道置为接收模式,直到取消队列的订阅为止。在接收模式期间,RabbitMQ会不断地推送消息给消费者(当然推送消息的个数还是会受到Qos的限制)。如果只想从队列获得单条消息而不是持续订阅,建议还是使用拉模式。但是不能将Get方法放在一个循环里来代替推模式,这样做会严重影响RabbitMQ的性能。如果要实现高吞吐,消费者理应使用推模式。

消费端的确认与拒绝

为了保证消息从队列可靠地达到消费者,RabbitMQ提供了消息确认机制。消费者在订阅队列时,可以指定autoAck参数,当autoAck等于false时,RabbitMQ会等待消费者显式地回复确认信号后才删除消息。当autoAck等于true时,RabbitMQ会自动把发送出去的消息置为确认,然后删除,而不管消费者是否真正地消费到了这些消息。

当autoAck参数置为false,对于RabbitMQ服务端而言,队列中的消息分成了两个部分:一部分是等待投递给消费者的消息,另一部分是已经投递给消费者,但是还没有收到消费者确认信号的消息。如果RabbitMQ一直没有收到消费者的确认信号,并且消费此消息的消费者已经断开连接,则RabbitMQ会安排该消息重新进入队列,等待投递给下一个消费者(当然也有可能还是原来的那个消费者)。

RabbitMQ不会为未确认的消息设置过期时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否已经断开。

确认消息

func (ch *Channel) Ack(tag uint64, multiple bool) error
func (d Delivery) Ack(multiple bool) error
  • tag:可以看作是消息的编号。
  • multiple:设置是否批量确认。true表示确认编号为tag之前所有未被当前消费者确认的消息,false表示仅确认编号为tag的消息。

拒绝消息

func (ch *Channel) Reject(tag uint64, requeue bool) error
func (d Delivery) Reject(requeue bool) error 
  • tag:可以看作是消息的编号。
  • requeue:设置是否将消息重新加入队列。如果设置为true,则RabbitMQ会重新将这条消息存入队列,以便可以发送给下一个订阅的消费者;如果设置为false,则立即会将消息从队列中移除。

批量拒绝消息

func (ch *Channel) Nack(tag uint64, multiple bool, requeue bool) error
func (d Delivery) Nack(multiple, requeue bool) error
  • tag:可以看作是消息的编号。
  • multiple:设置是否批量拒绝。
  • requeue:设置是否将消息重新加入队列。

注:将Reject或者Nack方法中的requeue设置为false,可以启用“死信队列”的功能。死信队列可以通过检测被拒绝或者未送达的消息来追踪问题。

恢复消息

func (ch *Channel) Recover(requeue bool) error
  • requeue:设置是否将消息重新加入队列。

此方法用来请求RabbitMQ重新发送还未被确认的消息。如果requeue参数设置为true,则未被确认的消息会被重新加入到队列中,这样对于同一条消息来说,可能会被分配给与之前不同的消费者。如果requeue参数设置为false,那么同一条消息会被分配给与之前相同的消费者。

注:Delivery的确认和拒绝相关方法实际上都调用了Channel的同名方法,一般情况下推荐使用Delivery的相关方法。

关闭连接

func (ch *Channel) Close() error
func (c *Connection) Close() error

显式地关闭Channel是个好习惯,但不是必须的,在Connection关闭的时候,Channel也会自动关闭。

还可以对连接和信道注册监听器来监听关闭事件:

func (c *Connection) NotifyClose(receiver chan *Error) chan *Error
func (ch *Channel) NotifyClose(c chan *Error) chan *Error

以上是关于RabbitMQ—基础客户端开发的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ-从基础到实战— Hello RabbitMQ

RabbitMQ-从基础到实战— 防止消息丢失

JSP基础

实现一个轻量级高可复用的RabbitMQ客户端

RabbitMQ-从基础到实战— 防止消息丢失

RabbitMQ 基础概念