RabbitMQ_5主题模式
Posted Rzk
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ_5主题模式相关的知识,希望对你有一定的参考价值。
主题模式
基于模式(主题)接收消息
*(星号)可以正好代替一个词。
# (hash) 可以代替零个或多个单词。
路由键设置为“ quick.orange.rabbit ”的消息将发送到两个队列
消息“ lazy.orange.elephant ”也会发给他们两个。
另一方面,“ quick.orange.fox ”只会进入第一个队列,而“ lazy.brown.fox ”只会进入第二个队列。
“ lazy.pink.rabbit ”只会被传送到第二个队列一次,即使它匹配了两个绑定。
“ quick.brown.fox ”不匹配任何绑定,因此将被丢弃。
如果我们违反約定并发送一到四个字的消息,例如“ orange ”或“ quick.orange.male.rabbit ”,会发生什么?好吧,这些消息不会匹配任何绑定并且会丢失。
另一方面,“ lazy.orange.male.rabbit ”,即使它有四个单词,也会匹配最后一个绑定,并将被传递到第二个队列。
主題隊列 消息生產者
String infoRoutingKey = "info.message.orange";
String errorRoutingKey = "error.rabbit.lazy";
String warningRoutingKey = "orange.warning.message";
/**
* @PackageName : com.rzk
* @FileName : Send
* @Description : 主题队列-消息生产者
* @Author : rzk
* @CreateTime : 23/6/2021 上午12:21
* @Version : 1.0.0
*/
public class Send {
//定义交换机名称
private final static String EXCHANGE_NAME = "exchange_topic";
public static void main(String[] argv) throws Exception {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("*");
factory.setUsername("yeb");
factory.setVirtualHost("/yeb");
factory.setPassword("yeb");
factory.setPort(5672);
try (
//连接工厂创建连接
Connection connection = factory.newConnection();
//创建信道
Channel channel = connection.createChannel()) {
//绑定交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//
String infoMessage = " 普通消息 ";
String errorMessage = " 错误消息 ";
String warningMessage = " 警告消息 ";
//需要准备对应的路由
String infoRoutingKey = "info.message.orange";
String errorRoutingKey = "error.rabbit.lazy";
String warningRoutingKey = "orange.warning.message";
//队列消息的生产者:发送消息
channel.basicPublish(EXCHANGE_NAME, infoRoutingKey, null, infoMessage.getBytes(StandardCharsets.UTF_8));
channel.basicPublish(EXCHANGE_NAME, errorRoutingKey, null, errorMessage.getBytes(StandardCharsets.UTF_8));
channel.basicPublish(EXCHANGE_NAME, warningRoutingKey, null, warningMessage.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent \'" + infoMessage + "\'");
System.out.println(" [x] Sent \'" + errorMessage + "\'");
System.out.println(" [x] Sent \'" + warningMessage + "\'");
}
}
}
消息接收
topic1會匹配到以下兩個:String infoRoutingKey = "info.message.orange";
String warningRoutingKey = "orange.warning.message";
主题队列 topic1
/**
* @PackageName : com.rzk.simple.recv
* @FileName : Recv
* @Description : 主题队列-消息接收
* @Author : rzk
* @CreateTime : 23/6/2021 上午12:22
* @Version : 1.0.0
*/
public class Recv01 {
private final static String EXCHANGE_NAME = "exchange_topic";
public static void main(String[] argv) throws Exception {
//创建工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("120.55.192.186");
factory.setUsername("yeb");
factory.setVirtualHost("/yeb");
factory.setPassword("yeb");
factory.setPort(5672);
//连接工厂创建连接
Connection connection = factory.newConnection();
//创建信道
Channel channel = connection.createChannel();
//绑定交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//获取队列(排他队列
String queueName = channel.queueDeclare().getQueue();
//队列绑定交换机
String errorRoutingKey = "#.message.#";
channel.queueBind(queueName,EXCHANGE_NAME,errorRoutingKey);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received \'" + message + "\'");
};
/**
* 监听队列消费消息
* autoAck:自动应答
* 当消费者收到该消息,会返回通知消息队列 我消费者已经收到消息了
*/
channel.basicConsume(queueName, false, deliverCallback, consumerTag -> { });
}
}
主题队列 topic2
topic2匹配的是error:String errorRoutingKey = "error.rabbit.lazy";
/**
* @PackageName : com.rzk.simple.recv
* @FileName : Recv
* @Description : 主题队列-消息接收
* @Author : rzk
* @CreateTime : 23/6/2021 上午12:22
* @Version : 1.0.0
*/
public class Recv02 {
private final static String EXCHANGE_NAME = "exchange_topic";
public static void main(String[] argv) throws Exception {
//创建工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("120.55.192.186");
factory.setUsername("yeb");
factory.setVirtualHost("/yeb");
factory.setPassword("yeb");
factory.setPort(5672);
//连接工厂创建连接
Connection connection = factory.newConnection();
//创建信道
Channel channel = connection.createChannel();
//绑定交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//获取队列(排他队列)
String queueName = channel.queueDeclare().getQueue();
//队列绑定交换机
String routingKey = "*.rabbit.*";
channel.queueBind(queueName,EXCHANGE_NAME,routingKey);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received \'" + message + "\'");
};
//监听队列消费消息
channel.basicConsume(queueName, false, deliverCallback, consumerTag -> { });
}
}
以上是关于RabbitMQ_5主题模式的主要内容,如果未能解决你的问题,请参考以下文章
springboot2.5.6集成RabbitMq,实现Topic主题模式