RabbitMQ----消费端限流TTL和使用代码生成交换机队列
Posted 未来.....
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ----消费端限流TTL和使用代码生成交换机队列相关的知识,希望对你有一定的参考价值。
RabbitMQ----消费端限流和TTL
先想一想什么是消费端限流?
假设一个场景,首先我们rabbitmq服务器上面有上万条没有处理的消息,我们随便打开一个消费者客户端,会出现下面情况:巨量消息瞬间全部推送过来,但是我们当个客户端没有办法进行处理这么多的数据,可能会造成服务器宕机。
上面这个假设可能不好理解,看看下面这个例子。
说一家餐馆,你去吃饭,你本来只能吃10碗,老板做了100碗,你本来是吃完一碗叫老板再给你端一碗,但是老板一下子把所有的都给你,说让你吃完,这时候吃的掉吗?此时就引入了一个新名词,自动确认信息,老板认为你能吃完,就全部给你,如果此时你是手动确认信息,你吃完一碗告诉老板,老板给你端上来一碗,每次只吃掉一碗,那么这样就可以解决老板一次性给你端上100百碗了。
此时引出来今天的内容,消费端限流。
1、消费端限流简介
rabbitmq提供了一种qos(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息(给channel或者consume设置Qos值)未被确认前,不进行消费新消息。
1、1 消费端限流代码:
老规矩:创建springboot项目和两个子项目
1、2 创建application.yml
文件
server:
port: 8001
spring:
rabbitmq:
host: 192.168.31.33
listener:
simple:
# none表示自动确认模式
# manual表示手动确认模式
acknowledge-mode: manual
# 设置每次消费的个数。
prefetch: 3
1、3 创建消费者
@Component
public class MyListener
@RabbitListener(queues = "ban_queue_direct02")
public void listeber(Message message, Channel channel)throws Exception
//获取信息的标识
long deliveryTag = message.getMessageProperties().getDeliveryTag();
byte[] body = message.getBody();
String msg = new String(body);
System.out.println(msg);
try
System.out.println("处理业务逻辑");
//消费端手动确认消息
//long deliveryTag, 表示的标识。
// boolean multiple:是否允许多确认
//为了测试一次只收到3条信息,收到信息不确认即可
//channel.basicAck(deliveryTag,true);//从队列中删除该消息。
catch (Exception e)
//(long deliveryTag, boolean multiple, boolean requeue: 是否让队列再次发送该消息。
channel.basicNack(deliveryTag,true,true);
1、4 生产者:
@Test
public void testReturn()
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback()
@Override
public void returnedMessage(ReturnedMessage returnedMessage)
//只要交换机到队列失败时才会触发该方法。失败之后可以继续发送也可以取消相应的业务功能。
System.out.println("消息从交换机到队列失败" + returnedMessage.getReplyText());
);
for (int i = 0; i < 10; i++)
rabbitTemplate.convertAndSend("ban_exchange_direct", "error", "hello confirm2");
1、5 测试结果
可以发现现在消费者每次只接收到了3条信息,测试成功。
里面测试我加了输出语句,所以一次会出现两条。
2、 TTL简介
RabbitMQ可以对消息和队列设置TTL. 目前有两种方法可以设置。
第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。
第二种方法是对消息进行单独设置,每条消息TTL可以不同。如果上述两种方法同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就称为dead message, 消费者将无法再收到该消息。
2、1 测试代码:
这里只需要测试生产者,消费者用不到。
//为队列设置过期时间 相当于该队列里面的消息都由过期时间
@Test
public void testSend()
rabbitTemplate.convertAndSend("myexchange","","hello xiaoxi");
//设置消息的过期时间 如果由设置了队列的过期时间 也设置了消息的过期时间 谁的过期时间短 以谁为准。
//该消息必须在头部才能从队列中移除。
@Test
public void testSend02()
for(int i=0;i<10;i++)
if(i==3)
MessagePostProcessor messagePostProcessor = new MessagePostProcessor()
@Override
public Message postProcessMessage(Message message) throws AmqpException
message.getMessageProperties().setExpiration("20000");
return message;
;
//String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor
rabbitTemplate.convertAndSend("myexchange", "", "hello xiaoxi"+i, messagePostProcessor);
else
//String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor
rabbitTemplate.convertAndSend("myexchange", "", "hello xiaoxi"+i);
3、 通过代码创建队列和交换机以及绑定。
没什么讲的,直接上代码:
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig
private final String exchange_name="myexchange";
private final String queue_name="myqueue";
//创建交换机对象
@Bean
public Exchange exchange()
Exchange exchange= ExchangeBuilder.fanoutExchange(exchange_name).durable(true).build();
return exchange;
//创建队列
@Bean(value = "queue")
public Queue queue()
Queue queue= QueueBuilder.durable(queue_name).withArgument("x-message-ttl",20000).build();
return queue;
//绑定交换机和队列
@Bean
public Binding binding(Queue queue, Exchange exchange)
return BindingBuilder.bind(queue).to(exchange).with("").noargs();
在测试单元调用代码测试:
@Test
public void send()
rabbitTemplate.convertAndSend("myexchange","","我饿了");
以上是关于RabbitMQ----消费端限流TTL和使用代码生成交换机队列的主要内容,如果未能解决你的问题,请参考以下文章