springboot 2.X 集成RabbitMQ 详解topic 模式
Posted monco-sxy
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了springboot 2.X 集成RabbitMQ 详解topic 模式相关的知识,希望对你有一定的参考价值。
Topic 模式
重点是理解交换器(exchange)、路由键(routing key)、队列名(queue name)三者之间的绑定关系。
-
topic 发送方:
发送方 关注参数主要有三个 交换器(exchange) 路由键(routing key) 和 消息 -
topic 消费方
消费方 关注点是队列的名字定义 -
topic 配置关系
配置关系,主要关注于同一路由键下的 路由键和队列的 绑定关系 -
重点使用的: 包含匹配规则 routing key 采用 . 分割 ,其中 “” 和 “#” 分别表达的意思是 其中“”用于匹配一个单词,“#”用于匹配多个单词
例如:
交换器名称(exchange name)为 A 队列名(queue name) 分别为 monco.xuzhou.man , monco.nanjing.man , sxy.xuzhou.woman , sxy.nanjing.woman
现在我们需要给包含monco的所有队列,那么我们的路由键设置为 monco.#
现在我们需要给包含sxy的所有xuzhou的队列,那么我们的路由键设置为 sxy.xuzhou.* 或者 sxy.xuzhou.##
"#" 表示 可以配置之后的所有的,"*" 表示可以配置之后的一个
生产者代码如下:
package com.monco.sender;
import com.monco.config.RmConst;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author monco
* @date 2020/4/15
* @description: topic 发送者
*/
@Component
public class TopicSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send() {
String msg1 = "I am email mesaage msg======";
System.out.println("TopicSender send the 1st : " + msg1);
this.rabbitTemplate.convertAndSend(RmConst.EXCHANGE_TOPIC, RmConst.QUEUE_TOPIC_EMAIL, msg1);
String msg2 = "I am user mesaages msg########";
System.out.println("TopicSender send the 2nd : " + msg2);
this.rabbitTemplate.convertAndSend(RmConst.EXCHANGE_TOPIC, RmConst.QUEUE_TOPIC_USER, msg2);
String msg3 = "I am error mesaages msg";
System.out.println("TopicSender send the 3rd : " + msg3);
this.rabbitTemplate.convertAndSend(RmConst.EXCHANGE_TOPIC, "errorkey", msg3);
}
}
配置类代码如下:
/**
* 定义topic模式的交换器
* exchange 交换器 名称 monco.topic.exchange
* 队列名称 monco.topic.email monco.topic.email
* <p>
* routing key 定义 monco.*.user
*/
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(RmConst.EXCHANGE_TOPIC);
}
@Bean
public Queue queueEmailMessage() {
return new Queue(RmConst.QUEUE_TOPIC_EMAIL);
}
@Bean
public Queue queueUserMessage() {
return new Queue(RmConst.QUEUE_TOPIC_USER);
}
@Bean
public Queue queueAllMessage() {
return new Queue(RmConst.QUEUE_TOPIC_All);
}
@Bean
public Binding bindingEmailExchangeMessage() {
return BindingBuilder
.bind(queueEmailMessage())
.to(topicExchange())
.with("monco.topic.email");
}
@Bean
public Binding bindingUserExchangeMessages() {
return BindingBuilder
.bind(queueUserMessage())
.to(topicExchange())
.with("monco.*.user");
}
@Bean
public Binding bindingAllExchangeMessages() {
return BindingBuilder
.bind(queueAllMessage())
.to(topicExchange())
.with("monco.*.user");
}
消费者代码如下:
package com.monco.receiver;
import com.monco.config.RmConst;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author monco
* @date 2020/4/15
* @description: topic email 接收者
*/
@Slf4j
@Component
public class TopicEmailReceiver {
@RabbitHandler
@RabbitListener(queues = "monco.topic.email")
public void processEmail(Message message, Channel channel) throws IOException {
// 手动应答
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
log.info("processEmail receive" + new String(message.getBody()));
}
@RabbitHandler
@RabbitListener(queues = "monco.topic.user")
public void processUser(Message message, Channel channel) throws IOException {
// 手动应答
channel.basicNack(message.getMessageProperties().getDeliveryTag(), true,false);
log.info("processUser receive" + new String(message.getBody()));
}
@RabbitHandler
@RabbitListener(queues = "monco.topic.user")
public void processUsera(Message message, Channel channel) throws IOException {
// 手动应答
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
log.info("processUser2 receive" + new String(message.getBody()));
}
@RabbitHandler
@RabbitListener(queues = "monco.monco.user")
public void processAll(Message message, Channel channel) throws IOException {
// 手动应答
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
log.info("processAll receive" + new String(message.getBody()));
}
}
以上是关于springboot 2.X 集成RabbitMQ 详解topic 模式的主要内容,如果未能解决你的问题,请参考以下文章
RabbitMQ——SpringBoot集成RabbitMQ
RabbitMQ——SpringBoot集成RabbitMQ
SpringBoot - SpringBoot集成RabbitMQ
SpringBoot | 第十二章:RabbitMQ的集成和使用