消息队列 RabbitMq高级特性
Posted baidawei
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了消息队列 RabbitMq高级特性相关的知识,希望对你有一定的参考价值。
一.消息的可靠投递
在使用RabbitMq的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败的场景。RabbitMQ为我们提供了两种方式用来控制消息的投递可靠性
rabbitMQ 整个消息投递过程为:
producer -> rabbitMQ broker -> exchange -> queue ->consumer
1.confirm 确认模式
消息从producer 到exchange 会返回一个 confirmCallback
2.return 退回模式
消息从exchange到queue投递失败则会返回一个returnCallbak
Confirm确认模式:
1.在配置文件中开启确认模式 publisher-confimrs :true
spring: rabbitmq: host: 10.211.55.4 virtual-host: local port: 5672 username: admin password: admin publisher-confirms: true
2.在rabbitTemplate定义ConfirmCallBack回调函数
@RequestMapping("/producer") public void producer(){ rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override /** * ack exchange交换机是否收到了消息 * cause 失败原因 */ public void confirm(CorrelationData correlationData, boolean ack, String cause) { if(ack){ System.out.println("confirm方法 成功了"); }else{ System.out.println("confirm方法 失败了: " + cause); } } }); rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"boot.test","spring boot rabbit mq"); }
Return退回模式:
1.开启退回模式
spring: rabbitmq: host: 10.211.55.4 virtual-host: local port: 5672 username: admin password: admin publisher-returns: true
2.设置returnCallBack
@RequestMapping("/return") public void returncallback(){ //设置交换机处理失败消息的模式 rabbitTemplate.setMandatory(true); //设置return call back rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { /** * * @param message 消息对象 * @param replyCode 错误码 * @param replyText 错误信息 * @param exchange 交换机 * @param routingKey 路由键 */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("return 执行了...."); System.out.println(message); System.out.println(replyCode); System.out.println(replyText); System.out.println(exchange); System.out.println(routingKey); //处理 } }); rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"zxcasd","return call back"); }
二、Consumer ACK 确认机制
表示消费端收到消息后到确认方式
有三种确认方式:
1.自动确认 acknowledge="none" 默认
2.手动确认 acknowledge="manual"
3.根据异常情况确认: acknowledge="auto"
1.开启手动确认
spring: rabbitmq: host: 10.211.55.4 virtual-host: local port: 5672 username: admin password: admin publisher-returns: true listener: simple: acknowledge-mode: manual
2.成功调用channel.basicAck 异常调用channel.basicNack
@Component public class RabbitMQListener implements ChannelAwareBatchMessageListener { @Override @RabbitListener(queues = RabbitMQConfig.QUEUE_NAME) public void onMessage(Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try{ System.out.println(new String(message.getBody())); int a = 1/0; //手动确认 channel.basicAck(deliveryTag,true); }catch (Exception ex){ //发生异常 拒绝确认 //第三个参数,重回队列 channel.basicNack(deliveryTag,true,true); } } }
三、消费端 限流
1.确保ack机制为手动确认
2.设置prefetch=1 每次从rabbitmq取几条,ack确认后再取下一条
server: port: 9999 spring: rabbitmq: host: 10.211.55.4 virtual-host: local port: 5672 username: admin password: admin publisher-returns: true listener: simple: acknowledge-mode: manual direct: prefetch: 1
四、TTL
Time To Live 存活时间/过期时间,当消息到达一定时间后,还没有被消费,会被自动清除。
RabbitMQ可以对消息设置过期时间,也可以对整个队列设置过期时间。
1.queue 队列设置过期时间
@Configuration public class RabbitMQConfig { public static final String EXCHANGE_NAME = "exchange_ttl"; public static final String QUEUE_NAME = "queue_ttl"; public static final String Routing_Key = "ttl.#"; // 1.交换机 @Bean("bootExchange") public Exchange bootExchange() { return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build(); } //2.Queue队列 @Bean("bootQueue") public Queue bootQueue() { return QueueBuilder.durable(QUEUE_NAME).withArgument("x-message-ttl", 5000).build(); } //3.绑定交换机和队列 @Bean public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(Routing_Key).noargs(); } }
2.消息设置过期时间
@RequestMapping("/producer") public void producer(){ MessagePostProcessor messagePostProcessor = new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setExpiration("5000"); // 消息过期时间 return message; } }; for(int i = 0;i<10;i++){ if(i>5){ rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"ttl.test","spring boot rabbit mq"+i,messagePostProcessor); }else{ rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"ttl.test","spring boot rabbit mq"+i); } } }
五、死信队列
DLX、Dead Letter Exchange (死信交换机),当消息成为Dead Messag后,可以被重新发送到另一台交换机中。
消息成为死信到三种情况:
1.队列消息长度达到限制,
2.消费者拒接消费信息,basicNack/basicReject 并且不把消息重新放入原目标队列,参数requeue=false。
3.原队列存在消息过期设置,消息到达超时时间未被消费
设定参数 x-dead-letter-exchange 和 x-dead-letter-routing-key
以上是关于消息队列 RabbitMq高级特性的主要内容,如果未能解决你的问题,请参考以下文章
RabbitMq高级特性之延迟队列 通俗易懂 超详细 内含案例
RabbitMQ——RabbitMQ的高级特性(TTL死信队列延迟队列优先级队列RPC)
消息队列专题(高级特性篇):RabbitMQ 如何保证消息的可靠性投递传输和消费
消息队列专题(高级特性篇):RabbitMQ 如何保证消息的可靠性投递传输和消费