RabbitMQ研究高级使用

Posted wu6660563

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ研究高级使用相关的知识,希望对你有一定的参考价值。

消息的流转

mandatory参数

之前的博客已经讲到了,当mandatory=true,交换器无法根据自身的类型和路由键找到一个符合条件的队列,会将消息返回给生产者。当mandatory=false,直接丢弃。

生产者想要获取到没有合适的队列的消息,可以通过channel.addReturnListener来实现监听。

channel.addReturnListener(new ReturnListener() 
    public void handleReturn(int replyCode, String replyText, 
        String exchange, String routingKey, 
        AMQP.BasicProperties basicProperties, byte[] body) throws IOException 
            String message = new String(body);
            System.out.println("Basic.Return返回的消息是:"+message);
        
);

immediate 参数

当immediate参数设置为true时,如果交换器在将消息路由到队列时发现队列上并不存在任何消费者,那么这条消息将不会存入队列中,当与路由键匹配的所有队列都没有消费者时,该消息会通过Basic.Return返回到生产者。

mandatory参数告诉服务器至少将该消息路由到一个队列中,否则将消息返回给生产者。immediate参数告诉服务器,如果该消息关联的队列上有消费者,则立刻投递;如果所有的匹配的队列上都没有消费者,则直接将消息返回给生产者,不用将消息存入队列而等待消费者。immediate会影响镜像队列的性能,增加了代码的复杂性,建议采用TTL和DLX的方法替代

备份交换器

备份交换器:Alternate Exchange,简称AE。生产者在发送消息的时候如果不设置mandatory参数,那么消息在未被路由的情况下将会丢失;如果设置了mandatory参数,那么需要添加ReturnListener的逻辑,生产者的代码将变得复杂。如果不想复杂生产者的逻辑,又不想消息丢失,可以使用备份交换器,这样可以将未被路由的消息存储在RabbitMQ中,等需要的时候去处理这些消息。

可以通过在声明交换器channel.exchangeDeclare()的时候添加alternate-exchange参数来实现,也可以通过配置策略的方式实现,如果两个都用了,优先用声明alternate-exchange参数的方式。

  • 声明备份交换器与绑定
//声明一个正常的交换器,并制定备份交换器为myAe
Map<String, Object> args = new HashMap<String , Object>();
args.put("a1ternate-exchange" , "myAe");
channe1.exchangeDec1are( "norma1Exchange" , "direct" , true , fa1se , args);

//声明myAe备份交换器,建议类型是fanout,防止路由键与绑定键不匹配造成消息丢失
channe1.exchangeDec1are( "myAe " , "fanout" , true , fa1se , nu11) ;

//绑定正常的queue和交换器、绑定键
channe1.queueDec1are( "norma1Queue " , true , fa1se , fa1se , nu11);
channe1.queueBind( " norma1Queue ""norma1Exchange" , " norma1Key");

//绑定备份交换器
channe1.queueDec1are("unroutedQueue " , true , fa1se , fa1se , nu11);
channe1.queueBind(" unroutedQueue""myAe" , " ");
  • 配置备份交换器
    rabbitmqctl set_policy AE " ^norma lExchange ♀"、"alternate-exchange": "myAE"'

备份交换器,特殊情况如下:

如果设置的备份交换器不存在,客户端和RabbitMQ服务端都不会有异常出现,此时消息会丢失
如果备份交换器没有绑定任何队列,客户端和RabbitMQ服务端都不会有异常出现,此时消息会丢失
如果备份交换器没有任何匹配的队列,客户端和RabbitMQ服务端都不会有异常出现,此时消息丢失
如果备份交换器和mandatory参数一起使用,那么mandatory参数无效

过期时间TTL

TTL:Time To Live,过期时间。RabbitMQ可以对消息和队列设置TTL。

设置消息的TTL

目前有两种方法设置TTL。一种是通过队列参数x-message-ttl(单位毫秒)来设置属性。第二种是单独对消息本身进行单独设置,每条消息的TTL不同。如果两种都设置了,以最小的数值为准。超过TTL值,会变成死信队列

  • 代码设置
Map<String, Object> argss = new HashMap<String , Object>();
argss.put("x-message-ttl " , 6000);
channel.queueDeclare(queueName , durable , exclusive , autoDelete , argss) ;
  • Policy设置
    rabbitmqctl set_policy TTL ".*" '"message-ttl":60000' --apply-to queues

  • API接口设置
    $ curl -i -u root:root -H "content-type:application/json"-X PUT -d'"auto_delete":false , "durable":true , "arguments":"x-message-ttl":60000' http://localhost:15672/api/queues/vhost/queuename

如果不设置TTL,消息不会过期;如果设置TTL为0,表示除非此时可以直接将消息投递给消费者,否则将立即丢弃。

针对单条消息设置TTL,通过设置builder.expiration即可

//可以通过Properties的Builder建造者模式构建器设置
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.deliveryMode(2);	//持久化消息
builder.expiration("60000");	//设置TTL=60000ms
//后面通过channel.basicPublish发送消息即可
//可以直接不对builder设置,直接在channel.basicPublish后面的Properties里面属性设置setExpiration

设置队列的TTL

通过channel.queueDeclare方法中的x-expires参数可以控制队列被自动删除前处于未使用状态的时间。未使用的意思是队列上没有任何的消费者,队列也没有被重新声明,并且过期时间内也没调用过Basic.Get命令

RabbitMQ在过期后删除队列是异步删除,而不是实时删除!
x-repires参数以毫秒Wie单位,并且服从和x-message-ttl一样的约束条件,不过不能设置为0,比如该参数设置为1000,表示该队列如果在1秒内未使用会被删除。

死信队列

DLX,全称为Dead-Letter-Exchange,成为死信交换器。当消息过期以后,会自动转到死信交换器中,而死信交换器里面的所有队列称为死信队列。

消息进入死信队列的情况:
消息被拒绝(Basic.Reject/Basic.Nack),设置了requeue参数为false;
消息过期
队列达到最大长度

  • 声明设置
    在声明交换器channel.queueDeclare中设置x-dead-letter-exchange参数来讲这个队列添加到DLX
channel.exchangeDeclare("dlx_exchange " , "direct "); //创建DLX: dlx_exchange
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange" , " dlx_exchange ");	//指定死信交换器
args.put("x-dead-letter-routing-key" , "dlx-routing-key");	//指定死信路由键
//为队列myqueue 添加DLX
channel.queueDeclare("myqueue" , false , false , false , args);
  • Policy设置
    rabbitmqctl set_policy DLX ".*" ' "dead-letter- exchange":" dlx_exchange " ' --apply-to queues

创建两个交换器exchange.normalexchange.dlx,分别绑定两个队列queue.normalqueue.dlx

//声明exchange.normal和`exchange.dlx交换器
channel.exchangeDeclare("exchange.dlx", "direct", true);
channel.exchangeDeclare("exchange.normal", "fanout", true);

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 3000);	//设置ttl
args.put("x-dead-letter-exchange", "exchange.dlx");
args.put("x-dead-letter-routing-key", "routingkey");

channel.queueDeclare("queue.normal", true, false, false, args);
channel.queueBind("queue.normal", "exchange.normal", "");	//设置路由键为空

channel.queueDeclare("queue.dlx", true, false, false, null);
channel.queueBind("queue.dlx", "exchange.dlx", "routingKey");
channel.basicPublish("exchange.normal", "rk", MessageProperties.RSISTENT_TEXT_PLAIN, "dlx".getBytes());

延迟队列

延迟队列存储的对象是对应的延迟消息,就是当消息发送以后,并不想让消费者立即拿到消息,而是等待特定时间,再由消费者消费。RabbitMQ本身并不支持延迟队列,而是通过DLX+TTL实现

将消息设置需要延迟的时间,然后发送到正常队列,消费者订阅对应的死信队列即可达到延迟效果。比如在支付状态异步通知的时候,阶梯回调,5秒、10秒、1分钟、30分钟、2小时维度,设置五个正常队列,绑定对应五个死信

优先级队列

优先级队列,就是高优先级的队列具有高的优先权,优先权高德消息具备优先被消费。通过设置x-max-priority参数

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-max-priority", 10);	//设置最大优先级为10
channel.queueDeclare("queue.priority", true, false, false, args);

优先级最低为0,最高为队列设置的最大优先级x-max-priority。优先级高的优先消费,前提是:如果在消费者的消费速度大于生产者的速度且Borker没有消息堆积,对发送的的消息设置优先级也没有太多实际的意义。因为生产一条就马上消费完毕,生产完就马上消费完毕,大多数情况下,消费者都是闲置的,优先级没太大意义。

RPC实现

在RabbitMQ中实现RPC非常简单。客户端发送请求消息,服务端回复相应消息。为了接收相应的消息,我们需要在请求消息中发送一个回调队列,可以使用默认队列

//-------------生产者-------------
//建立RPC返回消息的Direct Exchange, 消息队列和绑定关系         
channel.exchangeDeclare("rpcReplyExchange", "direct",true);
channel.queueDeclare("rpcReplyQueue", true, false, false, null);
channel.queueBind("rpcReplyQueue", "rpcReplyExchange", "rpcReplyMessage");


BasicProperties props = new BasicProperties.Builder()
    .replyTo(callbackQueueName)
    .correlationId(UUID.randomUUID().toString())	//指定生成RPC请求消息的CorrelationId
    .build();
channel.basicPublish("rpcSendExchange", "rpcSendMessage", props, message.getBytes());

//循环获取回调信息
String response = null;

while(true) 
   //从返回消息中取一条消息
   Delivery delivery = replyCustomer.nextDelivery();
   //如果消息的CorrelationId与发送消息的CorrleationId一致,表示这条消息是
   //发送消息对应的返回消息,是阶乘运算的计算结果。
   System.out.println("The received reply message's correlation id is:" +messageCorrelationId);
   String messageCorrelationId = delivery.getProperties().getCorrelationId();
   if (!Strings.isNullOrEmpty(messageCorrelationId) && messageCorrelationId.equals(correlationId)) 
   		response = new String(delivery.getBody());
   		break;
   



//------------------消费者---------------
Consumer consumer = new DefaultConsumer(channel) 
   @Override
   public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
       //获取返回消息发送到的Exchange名称
       String replyExchange = properties.getReplyTo();

       //设置返回消息的Properties,附带发送消息的CorrelationId.
       AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder()
               .correlationId(properties.getCorrelationId())
               .build();

       String message = new String(body, "UTF-8");
       System.out.println("The received message is:" + message);
       System.out.println("The received message's correlation id is:" + properties.getCorrelationId());

       String response = "这是计算以后的结果回调信息,这条消息要发送回调发送回生产者";

       //将回调消息发送到Reply Exchange
       this.getChannel().basicPublish(replyExchange, "rpcReplyMessage", replyProps, response.getBytes());
   
;

channel.basicConsume("rpcSendQueue", true, consumer);
  • replyTo:设置回调队列
  • correlationId:用来关联请求(request)和调用RPC之后的回复(response)

持久化

持久化主要是当RabbitMQ在异常情况下(重启、关闭、宕机)下的数据不丢失。RabbitMQ的持久化分为三部分:交换器的持久化队列持久化消息持久化

交换器持久化:声明参数的时候设置durable=true。如果不设置,重启的时候交换器可能丢数据,不过消息不回丢失。意思是部分交换器会丢失,消息无法发送到交换器上面。

队列的持久化:设置deliveryMode=2,实现消息持久化,可以通过参数MessageProperties.PERSISTENT_TEXT_PLAIN也可以设置

注意:
      可以将所有的消息都设置为持久化,但是会影响RabbitMQ的随机读写性能,特别是写入,如果在业务场景中,不需要消息回溯,可以根据实际情况设置为不持久化即可,需要在性能和可靠性权衡。

设置了交换器、队列和消息的持久化,才会持久化。那么是否是只要设置了三个,就一定100%消息持久化不会丢失呢?并不是!

  • 如果订阅消费队列将autoAck=true,但是消费者接收消息还没处理就宕机。数据将会丢失!
    这种建议设置为autoAck=false,手工确认即可。

  • 消息刷盘的时候,先将消息刷入Cache中,然后再一定时间内再写入磁盘文件。这个时候节点宕机、重启,消息还没来得及写入磁盘,消息丢失!
    引入主从(master-slave),保证高可用,除非整个集群全部挂掉

生产者确认

Rabbit为保证消息准确达到服务器,引入两种机制

  • 通过事务机制实现
  • 通过发送方确认机制实现

事务机制

RabbitMQ三个方法与事务有关:channel.txSelectchannel.txCommitchannel.txRollbackchannel.txSelect用于将信道设置为事务模式,channel.txCommit提交事务,channel.txRollback事务回滚。

channel.txSelect();	//设置为事务模式,Borker收到以后,将信道设置为事务模式
//发送消息
channel.basicPublish(exchangeName, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, "事务消息".getBytes());
//提交事务
channel.commit();

//建议在此处catch一下异常,然后再catch里面channel.txRollback();

发送方确认机制

事务机制会严重降低RabbitMQ的消息吞吐量,这里就引入一种轻重级的方式—发送方确认机制。

生产者将信道设置为confirm模式,一旦进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID,一旦消息被投递到所有匹配的队列后,RabbitMQ会发送一个确认的Basic.Ack给生产者

channel.confirmSelect();	//将信道设置为publisher confirm模式
//之后正常发送消息
channel.basicPublish("exchange", "routingKey", null, "发送确认消息".getBytes());
if(!channel.waitForConfirms()) 
    //处理业务

如果发送多条消息,只需要再最外部包裹一层即可

注意要点:
1、事务机制和确认机制是互斥的,不能共存。同时配置会报错amqp_error , precondition_failed, " cannot switch from tx to confirm mode" , ' confirm .sele ct ';
2、事务机制和确认机制的是消息能够正确发送到RabbitMQ,这里的意思是指消息被正确发送到RabbitMQ的交换器,如果交换器没有匹配的队列,消息也会丢失。

确认机制可以设置为三种:同步confirm、批量confirm、异步confirm

  • 批量confirm方法:每发送一批消息,调用waitForConfirms,等待服务器确认返回
  • 异步confirm方法:提供一个回调方法,服务端确认一条或者多条消息后客户端会回调这个方法进行处理

消费端

消费端可以通过推模式和拉模式的方法来获取消费消息,有几点需要注意:

  • 消息分发
  • 消息顺序性
  • 弃用QueueingConsumer

消息分发

当一个队列有多个消费者的时候,队列的消息以轮询的分发方式发送给消费者。每条消息只会发送给订阅列表的一个消费者。

消息顺序性

消息的顺序性是指消费者消费到的消息和发送者发布的消息都是一致的。默认情况下就是按照顺序的。但是有很多突发场景不能保证。

如果生产者使用了事务机制,在发送消息只会遇到异常进行了回滚、发生超时、中断,那么重新补偿发送这条消息,如果补偿发是在另外一个线程发送的,那么消息就出现了错序

弃用QueueingConsumer

建议不要用这个类来实现订阅消费。4.X版本被标记为@Deprecated,因为内部会导致OOM
整理有以下缺陷:

  • QueueingConsumer会拖累同一个Connection下的所有信道,使其性能降低
  • 同步递归调用QueueingConsumer可能会产生死锁
  • RabbitMQ的自动连接回复机制不支持QueueingConsumer机制
  • QueueingConsumer不是事件驱动

消息传输保障

大多数MQ有三种模式:

  • At most once:最多一次。消息可能会丢失,但绝不会重复传输。
  • At least once:最少一次。消息绝不会丢失,但可能会重复传输。
  • Exactly once:恰好一次。每条消息肯定会被传输一次且仅传输一次。

RabbitMQ支持 最多一次最少一次

以上是关于RabbitMQ研究高级使用的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ延迟队列

RabbitMQ一文带你搞定RabbitMQ延迟队列

在 RabbitMQ 中需要单独的死信交换吗?

RabbitMQ实现延迟发送消息

RabbitMQ ACK、NACK、Type、TTL、死信

RabbitMQ 中的消息会过期吗?