RabbitMQ高级特性(2)
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ高级特性(2)相关的知识,希望对你有一定的参考价值。
参考技术A 一、消费端限流1.什么是消费端的限流
假设一个场景,首先,Rabbitmq服务器有上万条未处理的消息,我们随便打开一个消费者客户端,就会出现:巨大的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这么多数据!
2.RabbitMQ提供了一种Qos(服务质量控制)功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于consume或者channel设置Qos的值)未被确认前,不进行消费新的消息。
void BasicQos(uint prefetchSize,ushort prefetchCount,bool global);
prefetchCount:会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ACK,则该comsumer将block掉,直到有消息ack
global:true/false 是否将上面的设置应用于channel,就是限制是channel级别还是consumer级别
二、消费端的手动ACK和NACK
消费端进行消费的时候,如果由于业务异常我们可以进行日志的记录,然后进行补偿!
如果由于服务器宕机等严重问题,那我们就需要手动进行ACK保障消费端消费成功!
三、消费端的重回队列
1.消费端重回队列是为了对没有处理成功的消息,把消息重新会递给Broker
2.一般我们在实际应用中,都会关闭重回队列,也就是设置为false
四、TTL队列/消息
1.TTL
TTL:time to live 的缩写,也就是生存时间
RabbitMQ支持消息的过期时间,在消息发送时可以进行指定
RabbitMQ支持队列的过去时间,从消息入队列开始计算,只要超过了队列的超时时间配置,那么消息会自动删除
2.DLX
DLX:Dead-Letter-Exchange,死信队列
利用DLX,当消息在一个队列中变成死信(dead message)之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX
DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性
当一个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列
可以监听DLX这个队列中的消息做相应的处理,这个特性可以弥补RabbitMQ3.0以前支持的immediate参数的功能
3.消息变成死信的情况
消息被拒绝(basic.reject/basic.nack),并且requeue=false
消息TTL过期
队列达到最大长度
4.DLX死信队列设置
首先需要设置死信队列的exchange(dlx.exchange)、queue(dlx.queue)和路由规则(RoutingKey:#),然后进行绑定
然后进行正常交换机、队列、绑定,只不过我们需要在队列上加上一个参数即可:arguments.put("x-dead-letter-exchange","dlx.exchange");
RabbitMQ——高级特性(SpringBoot实现)
本篇文章的内容与我之前如下这篇文章一样,只是使用技术不同,本篇文章使用SpringBoot实现RabbitMQ的高级特性!
准备工作
我们首先建一个工程,接下来的代码都在这个工程里完成。
主要分为生产者和消费者
依赖
<!--AMQP依赖,包含RabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
yml主要配置
server: port: 8100 spring: rabbitmq: host: 101.43.13.118 port: 5672 username: admin password: 123456
在生产者端和消费者端都编写模板配置文件RabbitConfig,后面在配置文件里加队列、交换机等
@Configuration public class RabbitConfig @Resource //定义管理交换机、队列 CachingConnectionFactory connectionFactory; //rabbitmq模板 //定义rabbitTemplate对象操作可以在代码中方便发送消息 @Bean(value = "rabbitTemplate") public RabbitTemplate rabbitTemplate() return new RabbitTemplate(connectionFactory);
1、消息的可靠性投递
说到RabbitMQ里的消息可靠性投递,那我们不得不提到TCC事务(分布式事务)。
TCC事务
TCC的核心思想是:针对每一个操作都需要注册一个和其相对应的确认和补偿的操作,他分为三个阶段Try、Confirm和Cancel
- Try 阶段主要是对业务系统做检测及资源预留
- Confirm 阶段主要是对业务系统做确认提交,Try阶段执行成功并开始执行Confirm阶段时,默认Confirm阶段是不会出错的。即:只要Try成功,Confirm一定成功。若Confirm阶段真的出错了,需引入重试机制或人工处理。
- Cancel阶段主要是在业务执行错误,需要回滚的状态下执行的业务取消,预留资源释放。
RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。
⚫ confirm 确认模式
⚫ return 退回模式
1、确认模式confirm的SpringBoot实现
生产端yml
server: port: 8100 spring: rabbitmq: host: 101.43.13.118 port: 5672 username: admin password: 123456 virtual-host: / listener: simple: acknowledge-mode: manual #开启ACK:手动确认 retry: max-attempts: 5 #重试次数 initial-interval: 5000 #重试间隔时间 enabled: true #开启消费重试 default-requeue-rejected: false #重试次数超过 publisher-returns: true #开启回退模式 publisher-confirm-type: correlated #确认模式开启(新版本)
首先在生产端的RabbitConfig文件里定义队列和交换机,并绑定。
//队列 @Bean public Queue confirmQueue() return new Queue("confirm.queue"); //路由模式(routes)交换机(direct) @Bean public DirectExchange confirmExchange() return new DirectExchange("confirmExchange"); //绑定路由交换机和队列 //bind:队列 //to:交换机 //with:routingKey @Bean public Binding bindingDirectQueue(Queue confirmQueue,DirectExchange confirmExchange) return BindingBuilder.bind(confirmQueue).to(confirmExchange).with("confirm");
书写一个测试方法,测试生产端,确认模式
@SpringBootTest(classes = com.chw.ProducerMain.class) public class ProducerTesty @Resource private RabbitTemplate rabbitTemplate; /** * 确认模式: * 步骤: * 1. 确认模式开启:ConnectionFactory中开启publisher-confirms="true" * yml里面配置了publisher-confirm-type: correlated #确认模式开启(新版本) * 2. 在rabbitTemplate定义ConfirmCallBack回调函数 */ @Test public void testConfirm() throws InterruptedException //设置交换机处理失败消息的模式 rabbitTemplate.setMandatory(true); //2. 定义回调 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() /** * @param correlationData 相关配置信息 * @param ack exchange交换机 是否成功收到了消息。true 成功,false代表失败 * @param cause 失败原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) System.out.println("confirm方法被执行了...."); if (ack) //接收成功 System.out.println("接收成功消息" + cause); else //接收失败 System.out.println("接收失败消息" + cause); //做一些处理,让消息再次发送。 ); //关键关联数据 CorrelationData data =new CorrelationData(UUID.randomUUID().toString()); //3. 发送消息 //接收成功 //confirm方法被执行了.... //接收成功消息null rabbitTemplate.convertAndSend("confirmExchange", "confirm", "message confirm....",data); //接收失败 //confirm方法被执行了.... //接收失败消息channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'confirmExchange111' in vhost '/', class-id=60, method-id=40) // rabbitTemplate.convertAndSend("confirmExchange111", "confirm", "message confirm....",data); Thread.sleep(200);
2、return 退回模式
书写一个测试方法,测试生产端,退回模式
/** * 回退模式: 当消息发送给Exchange后,Exchange路由到Queue失败时 才会执行 ReturnCallBack * 步骤: * 1. 开启回退模式:publisher-returns="true" * yml里开启了 * 2. 设置ReturnCallBack * 3. 设置Exchange处理消息的模式: * 1. 如果消息没有路由到Queue,则丢弃消息(默认) * 2. 如果消息没有路由到Queue,返回给消息发送方ReturnCallBack */ @Test public void testReturn() throws InterruptedException //设置交换机处理失败消息的模式 rabbitTemplate.setMandatory(true); //2.设置ReturnCallBack rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() /** * @param message 消息对象 * @param replyCode 错误码 * @param replyText 错误信息 * @param exchange 交换机 * @param routingKey 路由键 */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) System.out.println("return 执行了...."); System.out.println(message); System.out.println(replyCode); System.out.println(replyText); System.out.println(exchange); System.out.println(routingKey); //处理 ); //关键关联数据 CorrelationData data =new CorrelationData(UUID.randomUUID().toString()); //3. 发送消息 rabbitTemplate.convertAndSend("confirmExchange", "confirm111", "message confirm....",data); /** * return 执行了.... * (Body:'message confirm....' MessageProperties [headers=spring_returned_message_correlation=afa438b2-5e6e-41f9-91e5-cf0905bb4330, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]) * 312 * NO_ROUTE * confirmExchange * confirm111 */ Thread.sleep(200);
1.2、Consumer ACK
ack指Acknowledge,确认。 表示消费端收到消息后的确认方式。
有三种确认方式:
yml里都可以配置
• 自动确认:acknowledge="none"
• 手动确认:acknowledge="manual"
• 根据异常情况确认:acknowledge="auto",(这种方式使用麻烦,不作讲解)
我们来到消费端
消费端yml
server: port: 8100 spring: rabbitmq: host: 101.43.13.118 port: 5672 username: admin password: 123456 publisher-returns: true #开启回退模式 publisher-confirm-type: correlated #确认模式开启(新版本) virtual-host: / listener: simple: acknowledge-mode: manual #开启ACK:手动确认 retry: max-attempts: 5 #重试次数 initial-interval: 5000 #重试间隔时间 enabled: true #开启消费重试 default-requeue-rejected: false #重试次数超过 prefetch: 1 #设置消费端一次拉取多少消息。
我们在业务层里写队列监听
/** * Consumer ACK机制: * 1. 设置手动签收。acknowledge="manual" * yml里设置了 * 2. 使用RabbitListener注解,写明监听的队列queue * 3. 如果消息成功处理,则调用channel的 basicAck()签收 * 4. 如果消息处理失败,则调用channel的basicNack()拒绝签收,broker重新发送给consumer * @param message * @param channel * @throws IOException */ @Service public class ConsumerService @RabbitListener(queues = "confirm.queue") public void consumerAck(Message message, Channel channel) throws IOException //message:队列里的消息的所有信息 long deliveryTag = message.getMessageProperties().getDeliveryTag(); try //1.接收转换消息 System.out.println(new String(message.getBody())); //2. 处理业务逻辑 System.out.println("处理业务逻辑..."); // int i = 3/0;//出现错误 //3. 手动签收 channel.basicAck(deliveryTag,true); catch (Exception e) //e.printStackTrace(); //4.拒绝签收 //三个参数:tag -关联数据,bl-不批量确认,b1:true:放回队列,false:不放回队列 /* 第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端 */ channel.basicNack(deliveryTag,true,true); //channel.basicReject(deliveryTag,true);
1.3、消费端限流
/** * Consumer 限流机制 * 1. 确保ack机制为手动确认。(在ack基础上) * 2. listener-container配置属性 * 在yml里设置perfetch = 1,表示消费端每次从mq拉去一条消息来消费,直到手动确认消费完毕后,才会继续拉去下一条消息。 * @param message * @param channel * @throws Exception */ @RabbitListener(queues = "confirm.queue") public void consumerQos(Message message, Channel channel) throws Exception Thread.sleep(1000); //1.获取消息 System.out.println(new String(message.getBody())); //2. 处理业务逻辑 //3. 签收 channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
小结
在yml中配置 prefetch属性设置消费端一次拉取多少消息。
消费端的确认模式一定为手动确认。yml配置里acknowledge="manual"。
4、TTL
TTL 全称 Time To Live(存活时间/过期时间)
- 当消息到达存活时间后,还没有被消费,会被自动清除。
- RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间
首先在生产端的RabbitConfig文件里定义队列和交换机,并绑定。
//ttl队列 @Bean public Queue ttlQueue() Map<String, Object> args = new HashMap<>(1); // x-message-ttl 声明队列的TTL:队列的过期时间 args.put("x-message-ttl",10000); return QueueBuilder.durable("ttl.queue").withArguments(args).build(); //ttl通配符交换机 @Bean public TopicExchange ttlExchange() return new TopicExchange("ttlExchange"); //绑定TTL通配符交换机和TTL队列 @Bean public Binding bindingDirectQueue(Queue ttlQueue,TopicExchange ttlExchange) return BindingBuilder.bind(ttlQueue).to(ttlExchange).with("ttl.#");
书写测试方法测试TTL
/** * TTL:过期时间 * 1. 队列统一过期 * 2. 消息单独过期 * 如果设置了消息的过期时间,也设置了队列的过期时间,它以时间短的为准。 * 队列过期后,会将队列所有消息全部移除。 * 消息过期后,只有消息在队列顶端,才会判断其是否过期(移除掉) */ @Test public void testTtl() /*for (int i = 0; i < 10; i++) // 发送消息 //十秒后队列里的消息会清空,因为ttl队列过期时间为10秒 rabbitTemplate.convertAndSend("ttlExchange", "ttl.hehe", "message ttl...."); */ //消息的后处理对象,设置一些消息的参数信息 MessagePostProcessor messagePostProcessor = new MessagePostProcessor() @Override public Message postProcessMessage(Message message) throws AmqpException //1.设置message的信息 message.getMessageProperties().setExpiration("5000");//设置消息的过期时间 //2.返回该消息 return message; ; //消息单独过期 // rabbitTemplate.convertAndSend("ttlExchange", "ttl.hehe", "message ttl....",messagePostProcessor); for (int i = 0; i < 10; i++) if(i == 5) //消息单独过期 rabbitTemplate.convertAndSend("ttlExchange", "ttl.hehe", "message ttl....",messagePostProcessor); else //不过期的消息 rabbitTemplate.convertAndSend("ttlExchange", "ttl.hehe", "message ttl....");
TTL 小结
设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期。
设置消息过期时间使用参数:expiration。单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断这一消息是否过期。
如果两者都进行了设置,以时间短的为准。
1.5、死信队列
死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。
队列绑定死信交换机:
给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key
首先在生产端的RabbitConfig文件里定义队列和交换机,并绑定。
//正常dlx队列 @Bean public Queue dlxQueue() //正常队列关联死信交换机 Map<String, Object> args = new HashMap<>(4); // x-dead-letter-exchange 这里声明当前队列绑定的死信交换机 args.put("x-dead-letter-exchange", "dlxDeadExchange"); // x-dead-letter-routing-key 这里声明当前队列的死信路由key args.put("x-dead-letter-routing-key","dlxDead.hehe"); // x-message-ttl 声明队列的TTL 声明队列的过期时间 args.put("x-message-ttl",10000); //设置队列的长度限制 max-length args.put("x-max-length",10); return QueueBuilder.durable("dlx.queue").withArguments(args).build(); //dlx死信队列 @Bean public Queue dlxDeadQueue() return new Queue("dlxDead.queue"); //dlx通配符交换机 @Bean public TopicExchange dlxExchange() return new TopicExchange("dlxExchange"); //dlx死信通配符交换机 @Bean public TopicExchange dlxDeadExchange() return new TopicExchange("dlxDeadExchange"); //绑定dlx正常队列 @Bean public Binding bindingDxlQueue(Queue dlxQueue,TopicExchange dlxExchange) return BindingBuilder.bind(dlxQueue).to(dlxExchange).with("dlx.#"); //绑定dlx死信队列 @Bean public Binding bindingDxlDeadQueue(Queue dlxDeadQueue,TopicExchange dlxDeadExchange) return BindingBuilder.bind(dlxDeadQueue).to(dlxDeadExchange).with("dlxDead.#");
发送测试死信消息
/** * 发送测试死信消息: * 1. 过期时间 * 2. 长度限制 * 3. 消息拒收 */ @Test public void testDlx() //1. 测试过期时间,死信消息 // rabbitTemplate.convertAndSend("dlxExchange","dlx.haha","我是一条消息,我会死吗?"); //2. 测试长度限制后,消息死信 /* for (int i = 0; i < 20; i++) rabbitTemplate.convertAndSend("dlxExchange","dlx.haha","我是一条消息,我会死吗?"); */ //3. 测试消息拒收 rabbitTemplate.convertAndSend("dlxExchange","dlx.haha","我是一条消息,我会死吗?");
测试消息拒收,需要涉及到消费者代码(消费者业务层里书写监听)
/** * 监听器监听正常队列 * 打印结果: * 我是一条消息,我会死吗? * 处理业务逻辑... * 出现异常,拒绝接受 * @param message * @param channel * @throws Exception */ @RabbitListener(queues = "dlx.queue") public void consumerDlxQueue(Message message, Channel channel) throws Exception long deliveryTag = message.getMessageProperties().getDeliveryTag(); try //1.接收转换消息 System.out.println(new String(message.getBody())); //2. 处理业务逻辑 System.out.println("处理业务逻辑..."); int i = 3/0;//出现错误 //3. 手动签收 channel.basicAck(deliveryTag,true); catch (Exception e) //e.printStackTrace(); System.out.println("出现异常,拒绝接受"); //4.拒绝签收,不重回队列 requeue=false //这样才能路由到死信队列去 channel.basicNack(deliveryTag,true,false);
死信队列小结
1. 死信交换机和死信队列和普通的没有区别
2. 当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列消息成为死信的三种情况:
1. 队列消息长度到达限制;
2. 消费者拒接消费消息,并且不重回队列;
3. 原队列存在消息过期设置,消息到达超时时间未被消费;
1.6、延迟队列
延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。
首先在生产端的RabbitConfig文件里定义队列和交换机,并绑定。
//正常队列 //绑定,设置正常队列过期时间为30分钟 @Bean public Queue orderQueue() //正常队列关联死信交换机 Map<String, Object> args = new HashMap<>(4); // x-dead-letter-exchange 这里声明当前队列绑定的死信交换机 args.put("x-dead-letter-exchange", "dlxDeadOrderExchange"); // x-dead-letter-routing-key 这里声明当前队列的死信路由key args.put("x-dead-letter-routing-key","dlxDeadOrder.hehe"); // x-message-ttl 声明队列的TTL 声明队列的过期时间 args.put("x-message-ttl",10000); //设置队列的长度限制 max-length args.put("x-max-length",10); return QueueBuilder.durable("Order.queue").withArguments(args).build(); //dlx死信队列 @Bean public Queue dlxDeadOrderQueue() return new Queue("dlxDeadOrder.queue"); //通配符交换机 @Bean public TopicExchange orderExchange() return new TopicExchange("orderExchange"); //dlx死信通配符交换机 @Bean public TopicExchange dlxDeadOrderExchange() return new TopicExchange("dlxDeadOrderExchange"); //绑定正常队列 @Bean public Binding bindingOrderQueue(Queue orderQueue,TopicExchange orderExchange) return BindingBuilder.bind(orderQueue).to(orderExchange).with("order.#"); //绑定dlx死信队列 @Bean public Binding bindingDxlDeadOrderQueue(Queue dlxDeadOrderQueue,TopicExchange dlxDeadOrderExchange) return BindingBuilder.bind(dlxDeadOrderQueue).to(dlxDeadOrderExchange).with("dlxDeadOrder.#");
测试延迟队列发送消息
@Test public void testDelay() throws InterruptedException //1.发送订单消息。 将来是在订单系统中,下单成功后,发送消息 rabbitTemplate.convertAndSend("order_exchange","order.msg","订单信息:id=1,time=2019年8月17日16:41:47"); //2.打印倒计时10秒 for (int i = 10; i > 0 ; i--) System.out.println(i+"..."); Thread.sleep(1000);
消费者代码(消费者业务层里书写监听)
/** * 监听的是延迟队列里的死信队列! * @param message * @param channel * @throws Exception */ @RabbitListener(queues = "dlxDeadOrder.queue") public void consumerDlxDeadOrder(Message message, Channel channel) throws Exception long deliveryTag = message.getMessageProperties().getDeliveryTag(); try //1.接收转换消息 System.out.println(new String(message.getBody())); //2. 处理业务逻辑 System.out.println("处理业务逻辑..."); System.out.println("根据订单id查询其状态..."); System.out.println("判断状态是否为支付成功"); System.out.println("取消订单,回滚库存...."); //3. 手动签收 channel.basicAck(deliveryTag,true); catch (Exception e) //e.printStackTrace(); System.out.println("出现异常,拒绝接受"); //4.拒绝签收,不重回队列 requeue=false channel.basicNack(deliveryTag,true,false);
延迟队列小结
1. 延迟队列 指消息进入队列后,可以被延迟一定时间,再进行消费。
2. RabbitMQ没有提供延迟队列功能,但是可以使用 : TTL + 死信队列来实现延迟队列效果。
以上是关于RabbitMQ高级特性(2)的主要内容,如果未能解决你的问题,请参考以下文章
RabbitMq高级特性之延迟队列 通俗易懂 超详细 内含案例