springboot项目整合rabbitMq涉及消息的发送确认,消息的消费确认机制
Posted yangxiaohui227
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了springboot项目整合rabbitMq涉及消息的发送确认,消息的消费确认机制相关的知识,希望对你有一定的参考价值。
1.引入maven依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2.在application.yml的配置:
spring:
rabbitmq:
host: 106.52.82.241
port: 5672
username: yang
password: Yangxiaohui227
virtual-host: /
publisher-confirms: true #消息发送后,如果发送成功到队列,则会回调成功信息
publisher-returns: true #消息发送后,如果发送失败,则会返回失败信息信息
listener: #加了2下面2个属性,消费消息的时候,就必须发送ack确认,不然消息永远还在队列中
direct:
acknowledge-mode: manual
simple:
acknowledge-mode: manual
//为了统一管理所有的Mq消息,建一个类存储常量,消息的设计都基本会涉及(队列(queue),交换机(exchange),路由键(route)三个值) public class RabbitMqConstant //下单发送消息 队列名,交换机名,路由键的配置 public final static String SHOP_ORDER_CREATE_EXCHANGE="shop.order.create.exchange"; public final static String SHOP_ORDER_CREATE_ROUTE="shop.order.create.route"; public final static String SHOP_ORDER_CREATE_QUEUE="shop.order.create.queue";
package com.example.demo.mq; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; //该类是mq最重要的一个类,所有队列的创建,交换机的创建,队列和交换机的绑定都在这里实现 @Configuration public class RabbitMqConfig private final static Logger log = LoggerFactory.getLogger(RabbitMqConfig.class); @Autowired private CachingConnectionFactory connectionFactory; @Autowired private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer; /** * 单一消费者 * * @return */ @Bean(name = "singleListenerContainer") public SimpleRabbitListenerContainerFactory listenerContainer() SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); factory.setConcurrentConsumers(1); factory.setMaxConcurrentConsumers(1); factory.setPrefetchCount(1); factory.setTxSize(1); return factory; /** * 多个消费者 * * @return */ @Bean(name = "multiListenerContainer") public SimpleRabbitListenerContainerFactory multiListenerContainer() SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factoryConfigurer.configure(factory, connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); factory.setConcurrentConsumers(20); factory.setMaxConcurrentConsumers(20); factory.setPrefetchCount(20); return factory; /** * 模板的初始化配置 * * @return */ @Bean public RabbitTemplate rabbitTemplate() RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() @Override public void confirm(CorrelationData correlationData, boolean sucess, String cause) if (sucess) log.info("消息发送成功:correlationData(),ack(),cause()", correlationData, sucess, cause); ); rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) log.warn("消息丢失:exchange(),route(),replyCode(),replyText(),message:", exchange, routingKey, replyCode, replyText, message); ); return rabbitTemplate; //消息的创建设计三个步骤:队列的创建,交换机创建(direct类型,topic类型,fanout类型),队列和交换机的通过路由键的绑定 //--------- 下单消息配置 //队列 @Bean public Queue shopOrderCreateQueue() return new Queue(RabbitMqConstant.SHOP_ORDER_CREATE_QUEUE, true); //Direct交换机(一对一关系,一个direct交换机只能绑定一个队列,当有2个相同消费者时,如项目部署2台机,只有一个消费者能消费,) @Bean DirectExchange shopOrderCreateExchange() return new DirectExchange(RabbitMqConstant.SHOP_ORDER_CREATE_EXCHANGE); //绑定 @Bean Binding bindShopOrderCreateQueue() return BindingBuilder.bind(shopOrderCreateQueue()).to(shopOrderCreateExchange()).with(RabbitMqConstant.SHOP_ORDER_CREATE_ROUTE);
import com.alibaba.fastjson.JSON; import com.example.demo.domain.ShopOrderMast; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; //专门用一个类作为消息的生产者 @Service public class ShopMessagePublisher @Autowired private RabbitTemplate rabbitTemplate; public void sendCreateOrderMessage(ShopOrderMast orderMast) CorrelationData correlationData=new CorrelationData(); //该参数可以传,可以不传,不传时,correlationData的id值默认是null,消息发送成功后,在RabbitMqConfig类的rabbitTemplate类的confirm方法会接收到该值 correlationData.setId(orderMast.getCodOrderId()); String msg = JSON.toJSONString(orderMast); //convertAndSend该方法有非常多的重构方法,找到适合自己的业务方法就行了,这里我用的是其中一个,发送时指定exchange和route值,这样就会发到对应的队列去了 rabbitTemplate.convertAndSend(RabbitMqConstant.SHOP_ORDER_CREATE_EXCHANGE,RabbitMqConstant.SHOP_ORDER_CREATE_ROUTE,msg,correlationData);
//所有的消费都写在一个消费类中 @Service public class ShopMessageComsumer //监听下单消息 @RabbitListener(queues =RabbitMqConstant.SHOP_ORDER_CREATE_QUEUE) public void createOrderMesaageComsumer(String msg, Channel channel, Message message) try //消息可以通过msg获取也可以通过message的body属性获取 System.out.println("开始消费了"); ShopOrderMast shopOrderMast = JSON.parseObject(msg, ShopOrderMast.class); /** * 因为我在application.yml那里配置了消息手工确认也就是传说中的ack,所以消息消费后必须发送确认给mq * 很多人不理解ack(消息消费确认),以为这个确认是告诉消息发送者的,这个是错的,这个ack是告诉mq服务器, * 消息已经被我消费了,你可以删除它了 * 如果没有发送basicAck的后果是:每次重启服务,你都会接收到该消息 * 如果你不想用确认机制,就去掉application.yml的acknowledge-mode: manual配置,该配置默认 * 是自动确认auto,去掉后,下面的channel.basicAck就不用写了 * */ channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); catch (Exception e) try //出现异常,告诉mq抛弃该消息 channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false); e.printStackTrace(); catch (IOException e1) e1.printStackTrace();
//这里我发送了一条消息,orderId我设置为555556666666,在消息发送时,存到了CorrelationData对象中,因此,发送成功后,在confirm方法可以接收到该值了
//消息发送成功后,在控制台会看到有成功的回调信息,也就是回调了rabbitTemplate的:
confirm(CorrelationData correlationData, boolean sucess, String cause)
//上面测试的下单消息是direct类型消息的,现在创建一个topic消息
//RabbitMqConstant新增topic的配置信息 //下单topic消息:路由键的名字 星号* 代表多个字符,#号代表一个字符 //topic交换机,发送消息时,发送到指定shop.order.create.topic.exchange和shop.order.create.topic.route中 public final static String SHOP_ORDER_CREATE_TOPIC_EXCHANGE="shop.order.create.topic.exchange"; public final static String SHOP_ORDER_CREATE_TOPIC_TOUTE="shop.order.create.topic.route"; //队列1,通过shop.order.create.topic.*与交换机绑定 public final static String SHOP_ORDER_CREATE_TOPIC_ROUTE_ONE="shop.order.create.topic.*"; public final static String SHOP_ORDER_CREATE_TOPIC_QUEUE_ONE="shop.order.create.topic.queue.one"; //队列2 通过shop.order.create.topic.*与交换机绑定shop.order.create.topic.# public final static String SHOP_ORDER_CREATE_TOPIC_ROUTE_TWO="shop.order.create.topic.#"; public final static String SHOP_ORDER_CREATE_TOPIC_QUEUE_TWO="shop.order.create.topic.queue.two";
//在RabbitMqConfig新增topic队列的基本信息 //--------- 下单消息配置 //队列 @Bean public Queue shopOrderCreateQueue() return new Queue(RabbitMqConstant.SHOP_ORDER_CREATE_QUEUE, true); //Direct交换机(一对一关系,一个direct交换机只能绑定一个队列,当有2个相同消费者时,如项目部署2台机,只有一个消费者能消费,) @Bean DirectExchange shopOrderCreateExchange() return new DirectExchange(RabbitMqConstant.SHOP_ORDER_CREATE_EXCHANGE); //绑定 @Bean Binding bindShopOrderCreateQueue() return BindingBuilder.bind(shopOrderCreateQueue()).to(shopOrderCreateExchange()).with(RabbitMqConstant.SHOP_ORDER_CREATE_ROUTE); //-------------------------下单TOPIC消息的创建 //创建TOPIC交换机 @Bean TopicExchange shopOrderCreateTopicExchange() return new TopicExchange(RabbitMqConstant.SHOP_ORDER_CREATE_TOPIC_EXCHANGE); //---------------------------//队列1使用自己的route和交换机绑定 //创建队列1 @Bean public Queue shopOrderCreateQueueOne() return new Queue(RabbitMqConstant.SHOP_ORDER_CREATE_TOPIC_QUEUE_ONE, true); //绑定 @Bean Binding bindShopOrderCreateQueueOne() return BindingBuilder.bind(shopOrderCreateQueueOne()).to(shopOrderCreateTopicExchange()).with(RabbitMqConstant.SHOP_ORDER_CREATE_TOPIC_ROUTE_ONE); //---------------------------//队列2用自己的route和交换机绑定 //创建队列2 @Bean public Queue shopOrderCreateQueueTWO() return new Queue(RabbitMqConstant.SHOP_ORDER_CREATE_TOPIC_QUEUE_TWO, true); //绑定 @Bean Binding bindShopOrderCreateQueueTWO() return BindingBuilder.bind(shopOrderCreateQueueTWO()).to(shopOrderCreateTopicExchange()).with(RabbitMqConstant.SHOP_ORDER_CREATE_TOPIC_ROUTE_TWO);
//消息的发送方新增 //发送TOPIC消息 public void sendCreateOrderTOPICMessage(ShopOrderMast orderMast) CorrelationData correlationData=new CorrelationData(); //该参数可以传,可以不传,不传时,correlationData的id值默认是null,消息发送成功后,在RabbitMqConfig类的rabbitTemplate类的confirm方法会接收到该值 correlationData.setId(orderMast.getCodOrderId()); String msg = JSON.toJSONString(orderMast); //消息发送使用公共route而不是某个队列自己的route rabbitTemplate.convertAndSend(RabbitMqConstant.SHOP_ORDER_CREATE_TOPIC_EXCHANGE,RabbitMqConstant.SHOP_ORDER_CREATE_TOPIC_TOUTE,msg,correlationData);
//消息的消费方新增 //消费者1 @RabbitListener(queues =RabbitMqConstant.SHOP_ORDER_CREATE_TOPIC_QUEUE_ONE) public void createOrderMesaageComsumerOne(String msg, Channel channel, Message message) try //消息可以通过msg获取也可以通过message对象的body值获取 System.out.println("我是消费者1"); ShopOrderMast shopOrderMast = JSON.parseObject(msg, ShopOrderMast.class); /** * 因为我在application.yml那里配置了消息手工确认也就是传说中的ack,所以消息消费后必须发送确认给mq * 很多人不理解ack(消息消费确认),以为这个确认是告诉消息发送者的,这个是错的,这个ack是告诉mq服务器, * 消息已经被我消费了,你可以删除它了 * 如果没有发送basicAck的后果是:每次重启服务,你都会接收到该消息 * 如果你不想用确认机制,就去掉application.yml的acknowledge-mode: manual配置,该配置默认 * 是自动确认auto,去掉后,下面的channel.basicAck就不用写了 * */ channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); catch (Exception e) try //出现异常,告诉mq抛弃该消息 channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false); e.printStackTrace(); catch (IOException e1) e1.printStackTrace(); //消费者2 @RabbitListener(queues =RabbitMqConstant.SHOP_ORDER_CREATE_TOPIC_QUEUE_TWO) public void createOrderMesaageComsumerTWO(String msg, Channel channel, Message message) try //消息可以通过msg获取也可以通过message对象的body值获取 System.out.println("我是消费者2"); ShopOrderMast shopOrderMast = JSON.parseObject(msg, ShopOrderMast.class); /** * 因为我在application.yml那里配置了消息手工确认也就是传说中的ack,所以消息消费后必须发送确认给mq * 很多人不理解ack(消息消费确认),以为这个确认是告诉消息发送者的,这个是错的,这个ack是告诉mq服务器, * 消息已经被我消费了,你可以删除它了 * 如果没有发送basicAck的后果是:每次重启服务,你都会接收到该消息 * 如果你不想用确认机制,就去掉application.yml的acknowledge-mode: manual配置,该配置默认 * 是自动确认auto,去掉后,下面的channel.basicAck就不用写了 * */ channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); catch (Exception e) try //出现异常,告诉mq抛弃该消息 channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false); e.printStackTrace(); catch (IOException e1) e1.printStackTrace();
//测试结果:
//如何在rabbitMq管理页面查看没有还没被消费的消息信息:
通过界面发送Mq消息,场景,如日志发现某条消息没有发送,可以在这里发送回去:
以上是关于springboot项目整合rabbitMq涉及消息的发送确认,消息的消费确认机制的主要内容,如果未能解决你的问题,请参考以下文章
SpringBoot系列5SpringBoot整合RabbitMQ
RabbitMQ——springboot整合RabbitMQ