RabbitMQ(黑马spring cloud笔记)

Posted yangsf_

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ(黑马spring cloud笔记)相关的知识,希望对你有一定的参考价值。

MQ

目录

一、同步通讯和异步通讯

1. 同步通讯

优点

  • 时效性强,立即获取结果

缺点

  • 耦合度高
  • 性能和吞吐能力不如异步
  • 额外资源消耗
  • 级联失败问题

2. 异步通讯

优点

  • 服务解耦
  • 性能提升,吞吐量提高
  • 服务没有强依赖,不担心级联问题
  • 流量削峰

缺点

  • 依赖Broker的可靠性、安全性、吞吐能力
  • 架构复杂的情况下,业务没有明显的流程线,不好追踪管理

MQ即是事件驱动架构中的Broker。

二、RabbitMQ

1. 部署

直接docker拉一个:

# 拉取镜像
docker pull rabbitmq:3-management
#启动容器
docker run \\
 -e RABBITMQ_DEFAULT_USER=root \\
 -e RABBITMQ_DEFAULT_PASS=123456 \\
 --name mq \\
 --hostname mq1 \\
 -p 15672:15672 \\
 -p 5672:5672 \\
 -d \\
 rabbitmq:3-management
 # 15672是管理口

2. 架构

几个概念:

  • channel:操作MQ的工具
  • exchange:路由消息到队列中
  • queue:缓存消息
  • virtual host:虚拟主机,对queue、exchange等资源的逻辑分组

3. 常见消息模型

3.1 基本消息队列(Basic Queue)

  1. 依赖

    <!--AMQP依赖,包含RabbitMQ-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    

    amqp是高级消息队列协议,springAMQP则是一种实现。

  2. 配置

    spring:
      rabbitmq:
        host: 190.92.246.107 # 主机名
        port: 5672 # 端口
        virtual-host: / # 虚拟主机
        username: root
        password: 123456
    
  3. 实现

    • 发布者

      public class PublisherTest 
          @Autowired
          private RabbitTemplate rabbitTemplate;
      
          @Test
          public void testSimpleQueue() 
              String queueName = "simple.queue";
              String message = "hello, spring amqp";
              rabbitTemplate.convertAndSend(queueName, message);
          
      
      
    • 消费者

      配置都是一样的

      @Component
      public class SpringRabbitListener 
          @RabbitListener(queues = "simple.queue")
          public void listenSimpleQueue(String msg) 
              System.out.println(msg);
          
      
      

      启动main函数,成功:

3.2 工作消息队列(Work Queue)

两个消费者合作处理消息,避免消息堆积。

AMQP有一个消息预取机制,预取多少条消息是可以配置的。

spring:
  rabbitmq:
    host: 190.92.246.107 # 主机名
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: root
    password: 123456
    listener:
      simple:
        prefetch: 1
  • 发布者:

    @Test
    public void testSimpleQueue() throws InterruptedException 
        String queueName = "simple.queue";
        String message = "hello, spring amqp";
        for (int i = 0; i < 50; i++) 
            rabbitTemplate.convertAndSend(queueName, message + i);
            Thread.sleep(20);
        
    
    
  • 消费者

    @Component
    public class SpringRabbitListener 
        @RabbitListener(queues = "simple.queue")
        public void listenSimpleQueue1(String msg) throws InterruptedException 
            System.out.println("消费者1" + "【" + msg + "】" + LocalTime.now());
            Thread.sleep(20);
        
    
        @RabbitListener(queues = "simple.queue")
        public void listenSimpleQueue2(String msg) throws InterruptedException 
            System.err.println("消费者2" + "【" + msg + "】" + LocalTime.now());
            Thread.sleep(200);
        
    
    

如果消息预取机制不设置,意味着不设限,那么在这个例子中每个消费者无论处理能力如何,都会处理25条消息,设置为1后,则按照能力分配。

3.3 发布订阅(Publish、Subscribe)

和之前不同的是,可以将一条消息发送给多个消费者,实现方式是加入了交换机。

根据交换机类型不同分为三种:广播、路由和主题

  • Fanout Exchange 广播

    这个交换机会将消息路由到每一个和它绑定的队列

    • 发布者

      不同的是,我们发送消息到交换机

      @Test
      public void testSendFanoutExchange() 
          String exchangeName = "root.fanout";
          String message = "hello everyone";
          rabbitTemplate.convertAndSend(exchangeName, "", message);
      
      
    • 订阅者

      首先创建交换机和队列,并将队列绑定到交换机上(有注解的写法,像后文路由模式那样)

      @Configuration
      public class FanoutConfig 
          @Bean
          public FanoutExchange fanoutExchange() 
              return new FanoutExchange("root.fanout");
          
      
          @Bean
          public Queue fanoutQueue1() 
              return new Queue("fanout.queue1");
          
      
          @Bean
          public Queue fanoutQueue2() 
              return new Queue("fanout.queue2");
          
      
          @Bean
          public Binding bindQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange) 
              return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
          
      
          @Bean
          public Binding bindQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange) 
              return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
          
      
      

      然后监听队列:

      @RabbitListener(queues = "fanout.queue1")
      public void listenFanoutQueue1(String msg) throws InterruptedException 
          System.out.println("fanout.queue1消费者" + "【" + msg + "】" + LocalTime.now());
      
      
      @RabbitListener(queues = "fanout.queue2")
      public void listenFanoutQueue2(String msg) throws InterruptedException 
          System.err.println("fanout.queue2消费者" + "【" + msg + "】" + LocalTime.now());
      
      

      启动测试:

  • Direct Exchange 路由

    特点:

    • 每个Queue都与Exchange设置一个BindingKey
    • 发布者发送消息时,指定消息的RoutingKey
    • Exchange将消息路由到BindingKey与消息Routingkey一致的队列

    接下来就可以测试一下:

    有一个交换机,两个队列,两个消费者分别有两个BindingKey。

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue1"),
            exchange = @Exchange(name = "root.direct", type = ExchangeTypes.DIRECT),
            key = 
                    "blue",
                    "red"
            
    ))
    public void listenDirectQueue1(String msg) 
        System.err.println("direct.queue1消费者" + "【" + msg + "】" + LocalTime.now());
    
    
    
    
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2"),
            exchange = @Exchange(name = "root.direct", type = ExchangeTypes.DIRECT),
            key = 
                    "yellow",
                    "red"
            
    ))
    public void listenDirectQueue2(String msg) 
        System.err.println("direct.queue2消费者" + "【" + msg + "】" + LocalTime.now());
    
    
    

    发布者:

    @Test
    public void testSendDirectExchange() 
        String exchangeName = "root.direct";
        String message = "hello red";
        rabbitTemplate.convertAndSend(exchangeName, "red", message);
    
    

    不断更换routingKey,观察订阅者日志。

  • Topic Exchange 主题

    和路由模式类似,区别是这个模式的key是多个单词的列表,以 “ . ” 分割。

    在指定BIndingKey时可以使用通配符。例如:#代表0个或多个单词,*代表一个单词。

    • 订阅

      @RabbitListener(bindings = @QueueBinding(
              value = @Queue(name = "topic.queue1"),
              exchange = @Exchange(name = "root.topic", type = ExchangeTypes.TOPIC),
              key = 
                      "china.#"
              
      ))
      public void listenTopicQueue1(String msg) 
          System.err.println("topic.queue1消费者" + "【" + msg + "】" + LocalTime.now());
      
      @RabbitListener(bindings = @QueueBinding(
              value = @Queue(name = "topic.queue2"),
              exchange = @Exchange(name = "root.topic", type = ExchangeTypes.TOPIC),
              key = 
                      "#.news"
              
      ))
      public void listenTopicQueue2(String msg) 
          System.err.println("topic.queue2消费者" + "【" + msg + "】" + LocalTime.now());
      
      
    • 发布

      @Test
      public void testSendTopicExchange() 
          String exchangeName = "root.topic";
          String message = "hello world";
          rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
      
      

      改变routingkey,观察日志。

4. 消息转换器

我们不仅仅可以发送字符串消息,还可以发送对象,默认情况下,需要传统的序列化方式,对象需要实现Serializable接口,不太方便,我们使用json。

  1. 引入依赖

    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
    </dependency>
    
  2. 自定义MessageConverter

    @Bean
    public MessageConverter messageConverter() 
        return new Jackson2JsonMessageConverter();
    
    

    这个时候发送的消息就会经过json序列化了。

  3. 测试

    创建队列

    @Bean
    public Queue fanoutExchange() 
        return new Queue("object.queue");
    
    

    消费者(需要像发布者一样的,引入jackson,然后定义messageConverter)

    @RabbitListener(queues = "object.queue")
    public void listenObjectQueue(Map<String, Object> msg) 
        System.err.println("object.queue消费者" + "【" + msg.get("name") + "】" + LocalTime.now());
        System.err.println("object.queue消费者" + "【" + msg.get("date") + "】" + LocalTime.now());
    
    

    发布消息

    @Test
    public void testSendObj() 
        String queue = "object.queue";
        Map<String, Object> msg = new HashMap<>();
        msg.put("name", "root");
        msg.put("date", new Date());
        rabbitTemplate.convertAndSend(queue, msg);
    
    

    成功:

以上是关于RabbitMQ(黑马spring cloud笔记)的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ学习笔记-p2(SpringAMQP)

RabbitMQ学习笔记-p2(SpringAMQP)

Spring Cloud自用一初识Spring Cloud

Spring Cloud Config + Spring Cloud Bus + RabbitMQ - 不使用本地 Git 存储库自动刷新客户端

Spring Cloud Stream RabbitMQ

spring-cloud-stream 整合 rabbitmq