springboot rabbitmq
Posted haohao1234
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了springboot rabbitmq相关的知识,希望对你有一定的参考价值。
一.消息发送端
pom.xml
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
application.yml
spring: datasource: type: com.alibaba.druid.pool.DruidDataSource username: root password: 123456 url: jdbc:mysql://192.168.99.100:3306/migu_cms?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT&allowMultiQueries=true jackson: date-format: yyyy-MM-dd time-zone: GMT+8 rabbitmq: username: guest password: guest host: 192.168.99.100 publisher-returns: true publisher-confirm-type: correlated #必须配置这个才会确认回调
rabbitTemplate.setConfirmCallback 在消息确认投递到brocker后回调将redis中保存的msg删除
@Configuration public class RabbitConfig { public static final Logger logger = LoggerFactory.getLogger(RabbitConfig.class); @Autowired CachingConnectionFactory cachingConnectionFactory; @Autowired StringRedisTemplate redisTemplate; @Bean RabbitTemplate rabbitTemplate(){ RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);// 从 producer 到 rabbitmq broker cluster 则会返回一个 confirmCallback //1 发送数据并返回(不确认rabbitmq服务器已成功接收) //2 异步的接收从rabbitmq返回的ack确认信息 //3 收到ack后调用confirmCallback函数 rabbitTemplate.setConfirmCallback((correlationData, ack, cause)->{ String msgId = correlationData.getId(); if(ack){ redisTemplate.opsForHash().delete(Const.MAIL_CLIENT_LOG, msgId); logger.info(msgId+":消息发送成功"); } else { logger.error(msgId+":消息发送失败"); } }); // 没有投递到QUEUE rabbitTemplate.setReturnCallback((msg, repCode, repText, exchange, routingkey)->{ logger.error("消息发送失败"); // ?? rabbitTemplate.send(msg); }); return rabbitTemplate; } @Bean Queue mailQueue(){ return new Queue(Const.MAIL_QUEUE_NAME, true); } @Bean DirectExchange mailExchange(){ return new DirectExchange(Const.MAIL_EXCHANGE_NAME, true, false); } @Bean Binding mailBinding(){ return BindingBuilder.bind(mailQueue()).to(mailExchange()).with(Const.MAIL_ROUTING_KEY_NAME); } }
redis中保存的message,带时间可以判断超过时间后重试发送,确保发送到brocker
public class MessageWithTime<T> { private long time; private T message; }
执行完业务发送消息
// 新增banner bannerMapper.insertSelective(banner); // 发送rabbitmq String msgId = UUID.randomUUID().toString(); rabbitTemplate.convertAndSend(Const.MAIL_EXCHANGE_NAME, Const.MAIL_ROUTING_KEY_NAME, banner, new CorrelationData(msgId)); // redis缓存保存作为可靠性 MessageWithTime<Banner> messageWithTime = new MessageWithTime<>(System.currentTimeMillis(), banner); redisTemplate.opsForHash().put(Const.MAIL_CLIENT_LOG, msgId, new Gson().toJson(messageWithTime));
定时从redis中获取未确认的消息,进行重发
@Component public class RetryTask { public static final Logger logger = LoggerFactory.getLogger(RetryTask.class); @Autowired StringRedisTemplate redisTemplate; @Autowired RabbitTemplate rabbitTemplate; @PostConstruct private void startRetry(){ new Thread(()->{ while(true){ try { Thread.sleep(Const.RETRY_TIME_INTERVAL); Map<Object, Object> map = redisTemplate.opsForHash().entries(Const.MAIL_CLIENT_LOG); for(Map.Entry entry : map.entrySet()){ MessageWithTime<Banner> messageWithTime = (MessageWithTime<Banner>) entry.getValue(); String msgId = (String)entry.getKey(); // 超过三次重试 if(messageWithTime.getTime() + 3*Const.RETRY_TIME_INTERVAL < System.currentTimeMillis()){ logger.error("发送消息失败超过15秒 " + new Gson().toJson(messageWithTime)); redisTemplate.opsForHash().delete(Const.MAIL_CLIENT_LOG, entry.getKey()); } else { // 重试 rabbitTemplate.convertAndSend(Const.MAIL_EXCHANGE_NAME, Const.MAIL_ROUTING_KEY_NAME, messageWithTime.getMessage(), new CorrelationData((String)entry.getKey())); logger.info("重试发送消息:"+ msgId); } } } catch (Exception e) { e.printStackTrace(); } } }).start(); } }
二。消费端
消息的事务性
(1) 处理成功,从队列中删除消息
(2) 处理失败(网络问题,程序问题,服务挂了),将消息重新放回队列
为了做到这点,我们使用rabbitmq的手动ack模式
pom.xml
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
application.properties
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.host=192.168.99.100
spring.rabbitmq.port=5672
# 开启手动确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.prefetch=100
@Component public class MailReceiver { public static final Logger logger = LoggerFactory.getLogger(MailReceiver.class); @Autowired JavaMailSender javaMailSender; @Autowired MailProperties mailProperties; @Autowired TemplateEngine templateEngine; @Autowired StringRedisTemplate redisTemplate; @RabbitListener(queues = Const.MAIL_QUEUE_NAME) public void handler(Message message, Channel channel){ Long tag = null; try { Banner banner = (Banner) message.getPayload(); MessageHeaders header = message.getHeaders(); tag = (Long)header.get(AmqpHeaders.DELIVERY_TAG); String msgId = (String)header.get("spring_returned_message_correlation");
// 防止重复消费 if(redisTemplate.opsForHash().hasKey(Const.MAIL_SERVER_LOG, msgId)){ logger.info("消息已经被消费:"+msgId); channel.basicAck(tag, false); return; } // 发送邮件 MimeMessage msg = javaMailSender.createMimeMessage(); MimeMessageHelper helper = new MimeMessageHelper(msg); helper.setTo("1234@qq.com"); helper.setFrom("1234@qq.com"); helper.setSubject("创建Banner"); helper.setSentDate(new Date()); Context context = new Context(); context.setVariable("bannerId", banner.getId()); context.setVariable("bannerName", banner.getBannerName()); context.setVariable("createTime", banner.getCreateTime()); String mail = templateEngine.process("mail", context); helper.setText(mail, true); javaMailSender.send(msg); // 成功后发送ack确认,会将队列中该消息删除 channel.basicAck(tag, false); redisTemplate.opsForHash().put(Const.MAIL_SERVER_LOG, msgId, "java"); logger.info("邮件发送成功"); } catch (Exception e) { e.printStackTrace(); // 消费失败后发送nack使信息重新投递 // channel.basicNack否认消息,消息重回队列给下一个消费者消费 try { channel.basicNack(tag, false, true); } catch (IOException ex) { ex.printStackTrace(); } logger.error("邮件发送失败"); } } }
以上是关于springboot rabbitmq的主要内容,如果未能解决你的问题,请参考以下文章