RabbitMQ-发布确认高级
Posted weixin_43956692
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ-发布确认高级相关的知识,希望对你有一定的参考价值。
一、介绍
背景:在生产环境中由于一些不明原因,导致rabbitmq重启,在rabbitmq重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。特别是比较极端的情况,rabbitmq集群不可用时。
二、实现
1、正常情况下代码
配置类
@Configuration
public class ConfirmConfig
//交换机
public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
//队列
public static final String CONFIRM_QUEUE_NAME = "confirm_queue";
//routingKey
public static final String CONFIRM_ROUTING_KEY = "key1";
//声明交换机
@Bean("confirmExchange")
public DirectExchange confirmExchange()
return new DirectExchange(CONFIRM_EXCHANGE_NAME);
//声明队列
@Bean("confirmQueue")
public Queue confirmQueue()
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
//绑定
@Bean
public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue confirmQueue,
@Qualifier("confirmExchange") DirectExchange confirmExchange)
return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
生产者
@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController
@Autowired
private RabbitTemplate rabbitTemplate;
//发消息
@GetMapping("/sendMessage/message")
public void sendMessage(@PathVariable String message)
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
ConfirmConfig.CONFIRM_ROUTING_KEY,message);
log.info("发送消息的内容为:",message);
消费者
@Slf4j
@Component
public class ConfirmConsumer
@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
public void receiveConfirmMessage(Message message)
log.info("接收到队列confirm_queue消息:",new String(message.getBody()));
2、发布确认实现
实现回调接口RabbitTemplate.ConfirmCallback,并且在配置文件中添加下面配置
spring.rabbitmq.publisher-confirm-type=correlated
NONE:禁用发布模式,默认值
CORRELATED:发布消息成功到交换机后出发回调方法
SIMPLE:经测试有两种效果
1、和CORRELATED一样触发回调
2、在消息发布成功使用rabbitTemplate调用waitForConfirms或waitForConfirmOrDie方法等待broker节返回发送结果,根据返回结果来判定下一步的逻辑,需要注意的是waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker
@Slf4j
@Component
public class MyConfirmCallback implements RabbitTemplate.ConfirmCallback
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init()
//注入
rabbitTemplate.setConfirmCallback(this);
/**
* 交换机回调方法
* @param correlationData 保存的回调消息的id及相关信息
* @param ack 交换机收到消息 成功为true 失败为false
* @param cause 失败原因 成功为null
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause)
String id = correlationData.getId() != null?correlationData.getId():"";
if(ack)
log.info("交换机已经收到了id为的消息",id);
else
log.info("交换机没有收到id为的消息,由于原因",id,cause);
修改消费者代码
@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController
@Autowired
private RabbitTemplate rabbitTemplate;
//发消息
@GetMapping("/sendMessage/message")
public void sendMessage(@PathVariable String message)
CorrelationData correlationData = new CorrelationData("1");
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
ConfirmConfig.CONFIRM_ROUTING_KEY,message,correlationData);
log.info("发送消息的内容为:",message);
但这样只是解决了交换机出现故障,可以有回调,而队列出现问题的时候,还无法解决。
3、回退消息
Mandatory参数
在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个时事件的。通过设置mandatory参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。
实现
1>添加如下配置
spring.rabbitmq.publisher-returns=true
2>实现RabbitTemplate.ReturnCallback 接口,重写returnedMessage方法
@Slf4j
@Component
public class MyConfirmCallback implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init()
//注入
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
/**
* 交换机回调方法
* @param correlationData 保存的回调消息的id及相关信息
* @param ack 交换机收到消息 成功为true 失败为false
* @param cause 失败原因 成功为null
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause)
String id = correlationData.getId() != null?correlationData.getId():"";
if(ack)
log.info("交换机已经收到了id为的消息",id);
else
log.info("交换机没有收到id为的消息,由于原因",id,cause);
//消息不可达时,将消息返回给生产者
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey)
log.info("消息:。被交换机:退回,退回的原因:,路由key:",
new String(message.getBody()),exchange,replyText,routingKey);
4、备份交换机
1>修改之前确认交换机的配置列,添加备份交换机与备份队列,以及报警队列及绑定,同时修改确认交换机的声明(添加备份交换机的配置)
@Configuration
public class ConfirmConfig
//交换机
public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
//队列
public static final String CONFIRM_QUEUE_NAME = "confirm_queue";
//routingKey
public static final String CONFIRM_ROUTING_KEY = "key1";
//备份交换机
public static final String BACKUP_EXCHANGE_NAME = "backup_exchange";
//备份队列
public static final String BACKUP_QUEUE_NAME = "backup_queue";
//报警队列
public static final String WARNING_QUEUE_NAME = "warning_queue";
//备份交换机
@Bean("backupExchange")
public FanoutExchange backupExchange()
return new FanoutExchange(BACKUP_EXCHANGE_NAME);
//备份队列
@Bean("backupQueue")
public Queue backupQueue()
return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
//报警队列
@Bean("warningQueue")
public Queue warningQueue()
return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
//声明交换机
@Bean("confirmExchange")
public DirectExchange confirmExchange()
return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME)
.durable(true)
.withArgument("alternate-exchange",BACKUP_EXCHANGE_NAME) //指定备份交换机
.build();
//声明队列
@Bean("confirmQueue")
public Queue confirmQueue()
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
//绑定
@Bean
public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue confirmQueue,
@Qualifier("confirmExchange") DirectExchange confirmExchange)
return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
//绑定
@Bean
public Binding backupQueueBindingBackupExchange(@Qualifier("backupQueue") Queue backupQueue,
@Qualifier("backupExchange") FanoutExchange backupExchange)
return BindingBuilder.bind(backupQueue).to(backupExchange);
//绑定
@Bean
public Binding warningQueueBindingBackupExchange(@Qualifier("warningQueue") Queue warningQueue,
@Qualifier("backupExchange") FanoutExchange backupExchange)
return BindingBuilder.bind(warningQueue).to(backupExchange);
2>添加报警消费者
@Slf4j
@Component
public class WarningConsumer
@RabbitListener(queues = ConfirmConfig.WARNING_QUEUE_NAME)
public void receiveWarningMessage(Message message)
String msg = new String(message.getBody());
log.info("报警发现不可路由消息:",msg);
当消息回退和备份交换机同时配置,则备份交换机优先
以上是关于RabbitMQ-发布确认高级的主要内容,如果未能解决你的问题,请参考以下文章