RabbitMQ消息队列笔记
Posted 算不出来没办法
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ消息队列笔记相关的知识,希望对你有一定的参考价值。
目录
交换机
RabbitMQ 消息传递模型的核心思想是:生产者生产的消息从不会直接发送到队列
。生产者只能将消息发送到交换机(exchange)。
交换机工作内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机的类型决定了它们如何处理收到的消息。
交换机的类型:直接(direct)、主题(topic)、标题(headers)、扇出(fanout)
第一个参数表示交换机的名称,空串表示使用默认交换机。
channel.basiPublish("","hello",null,message.getBytes());
绑定:交换机和信道之间的桥梁,让交换机知道和哪个队列进行了绑定,可通过绑定选择性的发送消息。
扇出(fanout)
发布订阅模式:生产者将信息发送到交换机,交换机再将信息发送到其他队列当中。
测试
定义交换机并发送信息
//定义交换机并发送信息
public class Emitlog
// 交换机的名称
public static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext())
String message = scanner.next();
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8));
System.out.println("生产者发出的消息:" + message);
消息接收端
这里的消息接收端需要开启两个,复制下面的代码再新建一个类,修改输出信息即可。
//消息接收
public class ReceiveLogs02
//交换机名称
public static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception
Channel channel = RabbitMqUtils.getChannel();
//声明一个队列,名称随机,当消费者断开与队列的连接时,队列自动删除
String queueName = channel.queueDeclare().getQueue();
//绑定交换机与队列
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println("等待接受消息,把接受到的消息打印在屏幕上...");
DeliverCallback deliverCallback = (consumerTag, message) ->
System.out.println("ReceiveLogs02控制台打印接受到的消息:" + new String(message.getBody()));
;
channel.basicConsume(queueName, true, deliverCallback, consumerTag ->
);
测试结果
两个信道订阅了同一个交换机,都接收到了信息。
直接(direct)
队列只对它绑定的交换机的消息感兴趣。绑定用参数:routingKey 来表示也可称该参数为 binding key, 创建绑定代码如下:
channel.queueBind(queueName, EXCHANGE_NAME, "routingKey");
绑定之后交换机可通过关键字的不同,选择发送到不同的队列当中。
值得注意的是,可以一个key对应一个队列,也可以一个key对应多个队列。一个关键字绑定多个队列效果和扇出效果类似。没有绑定关系的信息将会被丢弃。
测试
生产者
public class DirectLogs
// 交换机的名称
public static final String EXCHANGE_NAME = "direct_logs";
public static final String[] choice = new String[]"error", "info", "warning";
public static void main(String[] args) throws Exception
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext())
String message = scanner.next();
String way = scanner.next();
scanner.nextLine();
channel.basicPublish(EXCHANGE_NAME, way, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println("生产者发出的消息:" + message);
消费者 1
public class ReceiveLogsDirect01
public static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception
Channel channel = RabbitMqUtils.getChannel();
//声明一个队列
channel.queueDeclare("console", false, false, false, null);
//绑定交换机与队列
channel.queueBind("console", EXCHANGE_NAME, "info");
channel.queueBind("console", EXCHANGE_NAME, "warning");
DeliverCallback deliverCallback = (consumerTag, message) ->
System.out.println("ReceiveLogsDirect01控制台打印接受到的消息:" + new String(message.getBody()));
;
channel.basicConsume("console", true, deliverCallback, consumerTag ->
);
消费者2
public class ReceiveLogsDirect02
public static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception
Channel channel = RabbitMqUtils.getChannel();
//声明一个队列
channel.queueDeclare("disk", false, false, false, null);
//绑定交换机与队列
channel.queueBind("disk", EXCHANGE_NAME, "error");
DeliverCallback deliverCallback = (consumerTag, message) ->
System.out.println("ReceiveLogsDirect02控制台打印接受到的消息:" + new String(message.getBody()));
;
channel.basicConsume("disk", true, deliverCallback, consumerTag ->
);
测试结果
信息被发送到了不同的队列当中。
主题(topic)
发送到类型是topic交换机的消息的routing_key不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。这些单词可以是任意单词,比如说:"stock.usd.nyse" , "nyse.vmw" , "quick.orange.rabbit" 这种类型的。当然这个单词列表最多不能超过255个字节。
其中可以使用下面两个替换符来进行单词的替代:
* 代替一个单词
# 代替零个或多个单词
匹配案例
*.a.*:三个单词的字符串中,中间单词为 a 即匹配。
#.a:最后一个单词是 a 即匹配。
测试
生产者
public class EmitLogTopic
//交换机的名称
public static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception
Channel channel = RabbitMqUtils.getChannel();
HashMap<String, String> map = new HashMap<>();
map.put("quick.orange.rabbit", "被队列Q1Q2接收到");
map.put("quick.orange.fox", "被队列Q1接收到");
map.put("lazy.brown.fox", "被队列Q2接收到 ");
map.put("lazy.pink.rabbit", "虽然满足队列Q2的两个绑定但是只会被接收一次");
map.put("quick.orange.male.rabbit", "四个单词不匹配任何绑定会被丢弃");
for (Map.Entry<String, String> bindingKeyEntry : map.entrySet())
String routingKey = bindingKeyEntry.getKey();
String message = bindingKeyEntry.getValue();
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println("生产者发送消息:" + message);
消费者1
public class ReceiveLogsTopic01
//交换机名称
public static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception
Channel channel = RabbitMqUtils.getChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//声明队列
String queueName = "Q1";
channel.queueDeclare(queueName, false, false, false, null);
//队列捆绑
channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");
System.out.println("等待接收消息......");
DeliverCallback deliverCallback = (consumerTag, message) ->
System.out.println(new String(message.getBody()));
System.out.println("接收队列:" + queueName + "绑定键:" + message.getEnvelope().getRoutingKey());
;
//接收消息
channel.basicConsume(queueName, true, deliverCallback, consumerTag ->
);
消费者2
public class ReceiveLogsTopic02
//交换机名称
public static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception
Channel channel = RabbitMqUtils.getChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//声明队列
String queueName = "Q2";
channel.queueDeclare(queueName, false, false, false, null);
//队列捆绑
channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit");
channel.queueBind(queueName, EXCHANGE_NAME, "*lazy.#");
System.out.println("等待接收消息......");
DeliverCallback deliverCallback = (consumerTag, message) ->
System.out.println(new String(message.getBody()));
System.out.println("接收队列:" + queueName + "绑定键:" + message.getEnvelope().getRoutingKey());
;
//接收消息
channel.basicConsume(queueName, true, deliverCallback, consumerTag ->
);
测试结果
先启动两个消费者,然后再启动生产者。
死信队列
死信:顾名思义就是无法被消费的消息。某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。
死信来源:消息过期、消息队列已满无法添加新数据、消息被拒绝了并且没有再次将消息放回队列当中。
消息TTL过期测试
生产者
public class Producer
//普通交换机的名称
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] args) throws Exception
Channel channel = RabbitMqUtils.getChannel();
//死信消息,设置TTL时间 单位是ms 10000ms是10s
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
for (int i = 0; i < 10; i++)
String message = "info" + i;
channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes(StandardCharsets.UTF_8));
普通消费者1
public class Consumer01
//普通交换机名称
public static final String NORMAL_EXCHANGE = "normal_exchange";
//死信交换机名称
public static final String DEAD_EXCHANGE = "dead_exchange";
//普通队列名称
public static final String NORMAL_QUEUE = "normal_queue";
//死信队列名称
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception
Channel channel = RabbitMqUtils.getChannel();
//声明死信和普通的交换机类型为direct
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
//声明普通队列
HashMap<String, Object> arguments = new HashMap<>();
//过期时间
arguments.put("x-message-ttl", 10000);
//正常队列设置死信队列
arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//设置死信RoutingKey
arguments.put("x-dead-letter-routing-key", "lisi");
//声明死信和普通队列
channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);
channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
//绑定普通的交换机与普通的队列
channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");
//绑定死信的交换机与死信的队列
channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");
System.out.println("等待接收消息......");
DeliverCallback deliverCallback = (consumerTag, message) ->
System.out.println("Consumer01接收的消息是:" + new String(message.getBody()));
;
channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, consumerTag ->
);
死信消费者2
public class Consumer02
//死信队列名称
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception
Channel channel = RabbitMqUtils.getChannel();
System.out.println("等待接收消息......");
DeliverCallback deliverCallback = (consumerTag, message) ->
System.out.println("Consumer02接收的消息是:" + new String(message.getBody()));
;
channel.basicConsume(DEAD_QUEUE, true, deliverCallback, consumerTag ->
);
测试结果
先启动普通消费者1,然后关闭普通消费者,启动生产者发送数据,在web网页查看。
过了10秒钟之后,由于普通队列信息超时,信息被送往了死信队列。
这个时候打开死信消费者2,接收信息。
队列达到最大长度测试
生产者
public class Producer
//普通交换机的名称
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] args) throws Exception
Channel channel = RabbitMqUtils.getChannel();
for (int i = 0; i < 10; i++)
String message = "info" + i;
channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes(StandardCharsets.UTF_8));
在普通消费者1中增加一行代码设置队列长度
arguments.put("x-max-length", 6);
死信消费者2代码不变
测试结果
首先在web页面删除普通队列,因为参数以及改变了。再次启动普通消费者,然后关闭。启动生产者。
消息被拒绝测试
生产者和测试队列最大长度代码相同,普通消费者修改如下
public class Consumer01
//普通交换机名称
public static final String NORMAL_EXCHANGE = "normal_exchange";
//死信交换机名称
public static final String DEAD_EXCHANGE = "dead_exchange";
//普通队列名称
public static final String NORMAL_QUEUE = "normal_queue";
//死信队列名称
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception
Channel channel = RabbitMqUtils.getChannel();
//声明死信和普通的交换机类型为direct
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
//声明普通队列
HashMap<String, Object> arguments = new HashMap<>();
//正常队列设置死信队列
arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//设置死信RoutingKey
arguments.put("x-dead-letter-routing-key", "lisi");
//声明死信和普通队列
channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);
channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
//绑定普通的交换机与普通的队列
channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");
//绑定死信的交换机与死信的队列
channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");
System.out.println("等待接收消息......");
DeliverCallback deliverCallback = (consumerTag, message) ->
String msg = new String(message.getBody());
if (msg.equals(("info5")))
System.out.println("拒绝此消息:" + msg);
channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
else
System.out.println("Consumer01接收的消息是:" + new String(message.getBody()));
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
;
channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, consumerTag ->
);
死信消费者代码不变
测试结果
首先在web管理页面删除普通信道,然后开启普通消费者1,最后开启生产者。
延迟队列
延迟队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望 在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。
在springBoot中集成rabbitMQ,注意springBoot版本不宜过高,推荐2.5.0。
延迟队列测试
代码架构图
Maven依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.73</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
目录结构
配置文件信息
spring.rabbitmq.host=192.168.193.147 #IP地址
spring.rabbitmq.port=5672 #端口号
spring.rabbitmq.username=admin #账号
spring.rabbitmq.password=123 #密码
SwaggerConfig 类
@Configuration
@EnableSwagger2
public class SwaggerConfig
@Bean
public Docket webApiConfig()
return new Docket(DocumentationType.SWAGGER_2)
.groupName("webApi")
.apiInfo(webApiInfo())
.select()
.build();
private ApiInfo webApiInfo()
return new ApiInfoBuilder()
.title("rabbitmq接口文档")
.description("本文档描述了rabbitmq微服务接口定义")
.version("1.0")
.contact(new Contact("enjoy6288", "www.couraki.com", "123456@qq.com"))
.build();
TtlQueueConfig类
导入包的时候很有可能导入错误,所以把包名也一同写出。
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/*
* TTL队列 配置文件类代码
* */
@Configuration
public class TtlQueueConfig
//普通交换机的名称
public static final String X_EXCHANGE = "X";
//死信交换机的名称
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
//普通队列的名称
public static final String QUEUE_A = "QA";
public static final String QUEUE_B = "QB";
//死信队列的名称
public static final String DEAD_LATTER_QUEUE = "QD";
//声明xExchange
@Bean("xExchange")
public DirectExchange xExchange()
return new DirectExchange(X_EXCHANGE);
//声明yExchange
@Bean("yExchange")
public DirectExchange yExchange()
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
//声明队列
@Bean("queueA")
public Queue queueA()
Map<String, Object> arguments = new HashMap<>(3);
//设置死信交换机
arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//设置死信Routing-key
arguments.put("x-dead-letter-routing-key", "YD");
//设置TTL 单位是ms
arguments.put("x-message-ttl", 10000);
return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
//声明普通队列 TTL为30s
@Bean("queueB")
public Queue queueB()
Map<String, Object> arguments = new HashMap<>(3);
//设置死信交换机
arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//设置死信Routing-key
arguments.put("x-dead-letter-routing-key", "YD");
//设置TTL 单位是ms
arguments.put("x-message-ttl", 30000);
return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();
//死信队列
@Bean("queueD")
public Queue queueD()
return QueueBuilder.durable(DEAD_LATTER_QUEUE).build();
//绑定
@Bean
public Binding queueABindingX(@Qualifier("queueA") Queue queueA,
@Qualifier("xExchange") DirectExchange xExchange)
return BindingBuilder.bind(queueA).to(xExchange).with("XA");
//绑定
@Bean
public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,
@Qualifier("xExchange") DirectExchange xExchange)
return BindingBuilder.bind(queueB).to(xExchange).with("XB");
//绑定
@Bean
public Binding queueDBindingX(@Qualifier("queueD") Queue queueD,
@Qualifier("yExchange") DirectExchange yExchange)
return BindingBuilder.bind(queueD).to(yExchange).with("YD");
生产者
@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController
@Autowired
private RabbitTemplate rabbitTemplate;
//开始发消息
@GetMapping("/sendMsg/message")
public void sendMsg(@PathVariable String message)
log.info("当前时间:,发送一条信息给两个TTL队列:", new Date().toString(), message);
rabbitTemplate.convertAndSend("X", "XA", "消息来自TTL为10s的队列:" + message);
rabbitTemplate.convertAndSend("X", "XB", "消息来自TTL为30s的队列:" + message);
消费者
@Slf4j
@Component
public class DeadLetterQueueConsumer
//接收消息
@RabbitListener(queues = "QD")
public void receiveD(Message message, Channel channel) throws Exception
String msg = new String(message.getBody());
log.info("当前时间:,收到死信队列的消息:", new Date().toString(), msg);
测试结果
首先启动SpringBoot项目,浏览器输入localhost:8080/ttl/sendMsg/nihao
日志输出结果
延迟队列优化
如上面的使用方式,如果我们有新的时间需求,那么就要新增一个队列,也就意味着代码并不通用。我们应当设计一个通用的队列。
代码架构图
TtlQueueConfig 类新增内容
public static final String QUEUE_C = "QC";
//声明QC队列
@Bean("queueC")
public Queue queueC()
Map<String, Object> arguments = new HashMap<>();
//设置死信交换机
arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
//设置死信RoutingKey
arguments.put("x-dead-letter-routing-key","YD");
return QueueBuilder.durable().withArguments(arguments).build();
@Bean
public Binding queueCBindingX(@Qualifier("queueC") Queue queueC,@Qualifier("xExchange") DirectExchange xExchange)
return BindingBuilder.bind(queueC).to(xExchange).with("XC");
生产者新增内容
//开始发消息
@GetMapping("sendExpirationMsg/message/ttlTime")
public void sendMsg(@PathVariable String message, @PathVariable String ttlTime)
log.info("当前时间:,发送一条时长毫秒TTL信息给队列QC:",
new Date().toString(), ttlTime, message);
rabbitTemplate.convertAndSend("X", "XC", message, msg ->
//发送消息的时候 延迟时长
msg.getMessageProperties().setExpiration(ttlTime);
return msg;
);
测试结果
localhost:8080/ttl/sendExpirationMsg/test1/20000
localhost:8080/ttl/sendExpirationMsg/test2/2000
两条信息先发送test1,延时20秒,然后发送test2,延时2秒。接收信息按照队列先后顺序接收。因为RabbitMQ智慧检查第一个消息是否过期,如果过期则丢到死信队列, 如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行!
插件实现延迟队列
安装插件
将插件拷贝到 rabbitMQ 插件文件夹中
cp rabbitmq_delayed_message_exchange-3.8.0.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins/
进入此文件夹
cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins/
输入安装指令
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
重启服务
systemctl restart rabbitmq-server.service
测试插件
代码架构图
目录结构
生产者新增代码
//开始发消息 基于插件的消息及延迟的时间
@GetMapping("/sendDelayMsg/message/delayTime")
public void sendMsg(@PathVariable String message, @PathVariable Integer delayTime)
log.info("当前时间:,发送一条时长毫秒信息给延迟队列delayed.queue:",
new Date().toString(), delayTime, message);
rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME
, DelayedQueueConfig.DELAYED_ROUTING_KEY, message, msg ->
// 发送消息的时候 延迟时长 单位ms
msg.getMessageProperties().setDelay(delayTime);
return msg;
);
消费者
@Slf4j
@Component
public class DeadLetterQueueConsumer
//接收消息
@RabbitListener(queues = "QD")
public void receiveD(Message message, Channel channel) throws Exception
String msg = new String(message.getBody());
log.info("当前时间:,收到死信队列的消息:", new Date().toString(), msg);
DelayedQueueConfig 类
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class DelayedQueueConfig
//队列
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
//交换机
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
//routingKey
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
//声明队列
@Bean
public Queue delayedQueue()
return new Queue(DELAYED_QUEUE_NAME);
//声明交换机
@Bean
public CustomExchange delayedExchange()
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-delayed-type", "direct");
return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message",
true, false, arguments);
//绑定
@Bean
public Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue delayedQueue,
@Qualifier("delayedExchange") CustomExchange delayedExchange)
return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
测试结果
localhost:8080/ttl/sendDelayMsg/test1/20000
localhost:8080/ttl/sendDelayMsg/test2/2000
先发送时长20秒的信息,然后在发送时长2秒的信息,先输出test2,再输出test1。
以上是关于RabbitMQ消息队列笔记的主要内容,如果未能解决你的问题,请参考以下文章