RabbitMQ03_Springboot整合RabbitMQ实现发布与订阅模式路由模式通配符模式
Posted 所得皆惊喜
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ03_Springboot整合RabbitMQ实现发布与订阅模式路由模式通配符模式相关的知识,希望对你有一定的参考价值。
①. SpringBoot案例 - 发布与订阅模式
-
①. 生产和消费者工程如下
-
②. 导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
- ③. 编写yaml(生产者和消费者一样)
server:
port: 8080
spring:
rabbitmq:
host: 139.198.169.136
port: 5672
virtual-host: /myvitrualhost
username: tang
password: 9602111022yxTZ@
- ④. 生产者配置文件如下
@Configuration
public class FanoutRabbitConfig
//1. 声明交换机
@Bean
public FanoutExchange fanoutOrderExchange()
return new FanoutExchange("fanout_order_exchange", true, false);
//2. 声明队列
@Bean
public Queue emailQueue()
// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
// return new Queue("TestDirectQueue",true,true,false);
//一般设置一下队列的持久化就好,其余两个就是默认false
return new Queue("email.fanout.queue", true);
@Bean
public Queue smsQueue()
return new Queue("sms.fanout.queue", true);
@Bean
public Queue weixinQueue()
return new Queue("weixin.fanout.queue", true);
//3. 将队列和交换机绑定
//绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
@Bean
public Binding bindingEmail()
return BindingBuilder.bind(emailQueue()).to(fanoutOrderExchange());
@Bean
public Binding bindingSms()
return BindingBuilder.bind(smsQueue()).to(fanoutOrderExchange());
@Bean
public Binding bindingWeixin()
return BindingBuilder.bind(weixinQueue()).to(fanoutOrderExchange());
- ⑤. service代码以及启动后的效果如下
@Service
public class OrderService
@Autowired
private RabbitTemplate rabbitTemplate;
//1. 发布与订阅模式
public void makeOrder(Long userId, Long productId)
// 1: 模拟用户下单
String orderNumer = UUID.randomUUID().toString();
System.out.println("用户 " + userId + ",订单编号是:" + orderNumer);
// 发送订单信息给RabbitMQ fanout
rabbitTemplate.convertAndSend("fanout_order_exchange", "", orderNumer);
@SpringBootTest
class ProducerApplicationTests
@Autowired
private OrderService orderService;
@Test
void contextLoads()
orderService.makeOrder(1L,1L);
- ⑥. 三个消费者代码如下
@Service
@RabbitListener(queues = "email.fanout.queue")
public class EmailService
@RabbitHandler
public void messageRevice(String message)
System.out.println("email----------"+message);
@Component
@RabbitListener(queues = "sms.fanout.queue")
public class SmsService
@RabbitHandler
public void messageRevice(String message)
System.out.println("SMS----------"+message);
@Component
@RabbitListener(queues = "weixin.fanout.queue")
public class WeixinService
@RabbitHandler
public void messageRevice(String message)
System.out.println("Weixin----------"+message);
- ⑦. 先启动生产者,后启动消费者,可以看到如下演示:
②. SpringBoot案例 - 路由模式
- ①. 生产者代码如下
@Configuration
public class DirectRabbitConfig
//1.声明交换机
@Bean
public DirectExchange directExchange()
return new DirectExchange("direct_order_exchange",true,false);
//2.声明队列
@Bean
public Queue emailDirectQueue()
return new Queue("email.direct.queue", true);
@Bean
public Queue smsDirectQueue()
return new Queue("sms.direct.queue", true);
@Bean
public Queue weChatDirectQueue()
return new Queue("weChat.direct.queue", true);
//3.队列和交换机绑定
@Bean
public Binding bingDirectEmail()
return BindingBuilder.bind(emailDirectQueue()).to(directExchange()).with("email");
@Bean
public Binding bingDirectSms()
return BindingBuilder.bind(smsDirectQueue()).to(directExchange()).with("sms");
@Bean
public Binding bindDirectWeChat()
return BindingBuilder.bind(weChatDirectQueue()).to(directExchange()).with("weChat");
@Service
public class OrderService
@Autowired
private RabbitTemplate rabbitTemplate;
//2. direct模式
public void makeDirectOrder(Long userId, Long productId)
// 1: 模拟用户下单
String orderNumer = UUID.randomUUID().toString();
System.out.println("用户 " + userId + ",订单编号是:" + orderNumer);
// 发送订单信息给RabbitMQ Sms和微信发送消息
rabbitTemplate.convertAndSend("direct_order_exchange", "sms", orderNumer);
rabbitTemplate.convertAndSend("direct_order_exchange", "weChat", orderNumer);
// Direct模式
@Test
public void DirectTest()
orderService.makeDirectOrder(1L,1L);
- ②. 消费者代码如下
@Service
@RabbitListener(queues = "email.direct.queue")
public class EmailService
@RabbitHandler
public void messageDirectRevice(String message)
System.out.println("email direct----------"+message);
@Component
@RabbitListener(queues = "sms.direct.queue")
public class SmsService
@RabbitHandler
public void messageDirectRevice(String message)
System.out.println("SMS direct----------"+message);
@Component
@RabbitListener(queues = "weChat.direct.queue")
public class WeixinService
@RabbitHandler
public void messageDirectRevice(String message)
System.out.println("weChat direct----------"+message);
③. SpringBoot案例 - 通配符模式
- ①. 生产者代码如下
//3. topic模式
public void makeTopicOrder(Long userId, Long productId)
// 1: 模拟用户下单
String orderNumer = UUID.randomUUID().toString();
System.out.println("用户 " + userId + ",订单编号是:" + orderNumer);
// 发送订单信息给RabbitMQ Sms和微信发送消息
// 匹配规则:#.sms.#、#.weChat.#
rabbitTemplate.convertAndSend("topic_order_exchange", "weChat.sms", orderNumer);
- ②. 消费者使用注解的方式替换配置类
// bindings其实就是用来确定队列和交换机绑定关系
@RabbitListener(bindings =@QueueBinding(
// email.fanout.queue 是队列名字,这个名字你可以自定随便定义。
value = @Queue(value = "email.topic.queue",autoDelete = "false"),
// order.fanout 交换机的名字 必须和生产者保持一致
exchange = @Exchange(value = "topic_order_exchange",
// 这里是确定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式
type = ExchangeTypes.TOPIC),
key = "#.email.#"
))
@Component
public class TopicEmailConsumer
@RabbitHandler
public void reviceMessage(String message)
System.out.println("email------topic模式:"+message);
@Component
// bindings其实就是用来确定队列和交换机绑定关系
@RabbitListener(bindings =@QueueBinding(
// email.fanout.queue 是队列名字,这个名字你可以自定随便定义。
value = @Queue(value = "sms.topic.queue",autoDelete = "false"),
// order.fanout 交换机的名字 必须和生产者保持一致
exchange = @Exchange(value = "topic_order_exchange",
// 这里是确定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式
type = ExchangeTypes.TOPIC),
key = "#.sms.#"
))
public class TopicSmsConsumer
@RabbitHandler
public void reviceMessage(String message)
System.out.println("sms------topic模式:"+message);
@Component
// bindings其实就是用来确定队列和交换机绑定关系
@RabbitListener(bindings =@QueueBinding(
// email.fanout.queue 是队列名字,这个名字你可以自定随便定义。
value = @Queue(value = "weChat.topic.queue",autoDelete = "false"),
// order.fanout 交换机的名字 必须和生产者保持一致
exchange = @Exchange(value = "topic_order_exchange",
// 这里是确定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式
type = ExchangeTypes.TOPIC),
key = "#.sms.#"
))
public class TopicWeChatConsumer
@RabbitHandler
public void reviceMessage(String message)
System.out.println("weChat------topic模式:"+message);
以上是关于RabbitMQ03_Springboot整合RabbitMQ实现发布与订阅模式路由模式通配符模式的主要内容,如果未能解决你的问题,请参考以下文章
2020-03-17 20:18:50springboot整合rabbitmq
SpringBoot整合RabbitMQ--重试/消息序列化--方法/实例