RabbitMQ03_Springboot整合RabbitMQ实现发布与订阅模式路由模式通配符模式

Posted 所得皆惊喜

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ03_Springboot整合RabbitMQ实现发布与订阅模式路由模式通配符模式相关的知识,希望对你有一定的参考价值。

①. SpringBoot案例 - 发布与订阅模式

  • ①. 生产和消费者工程如下

  • ②. 导入依赖

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
  • ③. 编写yaml(生产者和消费者一样)
server:
  port: 8080
spring:
  rabbitmq:
    host: 139.198.169.136
    port: 5672
    virtual-host: /myvitrualhost
    username: tang
    password: 9602111022yxTZ@
  • ④. 生产者配置文件如下
@Configuration
public class FanoutRabbitConfig 
    //1. 声明交换机
    @Bean
    public FanoutExchange fanoutOrderExchange() 
        return new FanoutExchange("fanout_order_exchange", true, false);
    

    //2. 声明队列
    @Bean
    public Queue emailQueue() 
        // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
        // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
        // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
        // return new Queue("TestDirectQueue",true,true,false);
        //一般设置一下队列的持久化就好,其余两个就是默认false
        return new Queue("email.fanout.queue", true);
    
    @Bean
    public Queue smsQueue() 
        return new Queue("sms.fanout.queue", true);
    
    @Bean
    public Queue weixinQueue() 
        return new Queue("weixin.fanout.queue", true);
    

    //3. 将队列和交换机绑定
    //绑定  将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
    @Bean
    public Binding bindingEmail() 
        return BindingBuilder.bind(emailQueue()).to(fanoutOrderExchange());
    
    @Bean
    public Binding bindingSms() 
        return BindingBuilder.bind(smsQueue()).to(fanoutOrderExchange());
    
    @Bean
    public Binding bindingWeixin() 
        return BindingBuilder.bind(weixinQueue()).to(fanoutOrderExchange());
    

  • ⑤. service代码以及启动后的效果如下
@Service
public class OrderService 

    @Autowired
    private RabbitTemplate rabbitTemplate;

    //1. 发布与订阅模式
    public void makeOrder(Long userId, Long productId) 
        // 1: 模拟用户下单
        String orderNumer = UUID.randomUUID().toString();
        System.out.println("用户 " + userId + ",订单编号是:" + orderNumer);
        // 发送订单信息给RabbitMQ fanout
        rabbitTemplate.convertAndSend("fanout_order_exchange", "", orderNumer);
    

@SpringBootTest
class ProducerApplicationTests 
    @Autowired
    private OrderService orderService;
    @Test
    void contextLoads() 
        orderService.makeOrder(1L,1L);
    

  • ⑥. 三个消费者代码如下
@Service
@RabbitListener(queues = "email.fanout.queue")
public class EmailService 
    @RabbitHandler
    public void messageRevice(String message)
        System.out.println("email----------"+message);
    

@Component
@RabbitListener(queues = "sms.fanout.queue")
public class SmsService 
    @RabbitHandler
    public void messageRevice(String message)
        System.out.println("SMS----------"+message);
    

@Component
@RabbitListener(queues = "weixin.fanout.queue")
public class WeixinService 
    @RabbitHandler
    public void messageRevice(String message)
        System.out.println("Weixin----------"+message);
    

  • ⑦. 先启动生产者,后启动消费者,可以看到如下演示:

②. SpringBoot案例 - 路由模式

  • ①. 生产者代码如下
@Configuration
public class DirectRabbitConfig 

    //1.声明交换机
    @Bean
    public DirectExchange directExchange()
        return new DirectExchange("direct_order_exchange",true,false);
    
    //2.声明队列
    @Bean
    public Queue emailDirectQueue() 
        return new Queue("email.direct.queue", true);
    
    @Bean
    public Queue smsDirectQueue() 
        return new Queue("sms.direct.queue", true);
    
    @Bean
    public Queue weChatDirectQueue() 
        return new Queue("weChat.direct.queue", true);
    
    //3.队列和交换机绑定
    @Bean
    public Binding bingDirectEmail()
        return BindingBuilder.bind(emailDirectQueue()).to(directExchange()).with("email");
    
    @Bean
    public Binding bingDirectSms()
        return BindingBuilder.bind(smsDirectQueue()).to(directExchange()).with("sms");
    
    @Bean
    public Binding bindDirectWeChat()
        return BindingBuilder.bind(weChatDirectQueue()).to(directExchange()).with("weChat");
    

@Service
public class OrderService 
    @Autowired
    private RabbitTemplate rabbitTemplate;
    //2. direct模式
    public void makeDirectOrder(Long userId, Long productId) 
        // 1: 模拟用户下单
        String orderNumer = UUID.randomUUID().toString();
        System.out.println("用户 " + userId + ",订单编号是:" + orderNumer);
        // 发送订单信息给RabbitMQ Sms和微信发送消息
        rabbitTemplate.convertAndSend("direct_order_exchange", "sms", orderNumer);
        rabbitTemplate.convertAndSend("direct_order_exchange", "weChat", orderNumer);
    

  // Direct模式
  @Test
  public void DirectTest()
      orderService.makeDirectOrder(1L,1L);
  

  • ②. 消费者代码如下
@Service
@RabbitListener(queues = "email.direct.queue")
public class EmailService 
    @RabbitHandler
    public void messageDirectRevice(String message)
        System.out.println("email direct----------"+message);
    

@Component
@RabbitListener(queues = "sms.direct.queue")
public class SmsService 
    @RabbitHandler
    public void messageDirectRevice(String message)
        System.out.println("SMS direct----------"+message);
    

@Component
@RabbitListener(queues = "weChat.direct.queue")
public class WeixinService 

    @RabbitHandler
    public void messageDirectRevice(String message)
        System.out.println("weChat direct----------"+message);
    

③. SpringBoot案例 - 通配符模式

  • ①. 生产者代码如下
    //3. topic模式
    public void makeTopicOrder(Long userId, Long productId)
        // 1: 模拟用户下单
        String orderNumer = UUID.randomUUID().toString();
        System.out.println("用户 " + userId + ",订单编号是:" + orderNumer);
        // 发送订单信息给RabbitMQ Sms和微信发送消息
        // 匹配规则:#.sms.#、#.weChat.#
        rabbitTemplate.convertAndSend("topic_order_exchange", "weChat.sms", orderNumer);
    
  • ②. 消费者使用注解的方式替换配置类
// bindings其实就是用来确定队列和交换机绑定关系
@RabbitListener(bindings =@QueueBinding(
        // email.fanout.queue 是队列名字,这个名字你可以自定随便定义。
        value = @Queue(value = "email.topic.queue",autoDelete = "false"),
        // order.fanout 交换机的名字 必须和生产者保持一致
        exchange = @Exchange(value = "topic_order_exchange",
                // 这里是确定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式
                type = ExchangeTypes.TOPIC),
        key = "#.email.#"
))
@Component
public class TopicEmailConsumer 
    @RabbitHandler
    public void reviceMessage(String message)
        System.out.println("email------topic模式:"+message);
    

@Component
// bindings其实就是用来确定队列和交换机绑定关系
@RabbitListener(bindings =@QueueBinding(
        // email.fanout.queue 是队列名字,这个名字你可以自定随便定义。
        value = @Queue(value = "sms.topic.queue",autoDelete = "false"),
        // order.fanout 交换机的名字 必须和生产者保持一致
        exchange = @Exchange(value = "topic_order_exchange",
                // 这里是确定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式
                type = ExchangeTypes.TOPIC),
        key = "#.sms.#"
))
public class TopicSmsConsumer
    @RabbitHandler
    public void reviceMessage(String message)
        System.out.println("sms------topic模式:"+message);
    

@Component
// bindings其实就是用来确定队列和交换机绑定关系
@RabbitListener(bindings =@QueueBinding(
        // email.fanout.queue 是队列名字,这个名字你可以自定随便定义。
        value = @Queue(value = "weChat.topic.queue",autoDelete = "false"),
        // order.fanout 交换机的名字 必须和生产者保持一致
        exchange = @Exchange(value = "topic_order_exchange",
                // 这里是确定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式
                type = ExchangeTypes.TOPIC),
        key = "#.sms.#"
))
public class TopicWeChatConsumer 
    @RabbitHandler
    public void reviceMessage(String message)
        System.out.println("weChat------topic模式:"+message);
    

以上是关于RabbitMQ03_Springboot整合RabbitMQ实现发布与订阅模式路由模式通配符模式的主要内容,如果未能解决你的问题,请参考以下文章

Spring和SpringBoot整合RabbitMQ

Spring和SpringBoot整合RabbitMQ

2020-03-17 20:18:50springboot整合rabbitmq

SpringBoot整合RabbitMQ--重试/消息序列化--方法/实例

Springboot 整合RabbitMq ,用心看完这一篇就够了

Rabbitmq基本使用 SpringBoot整合Rabbit SpringCloud Stream+Rabbit