SpringCloud系列[MQ 篇] - 详述 RabbitMQ 五种模型的结构及具体实现

Posted Fug_Lee

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringCloud系列[MQ 篇] - 详述 RabbitMQ 五种模型的结构及具体实现相关的知识,希望对你有一定的参考价值。

各种模型的具体实现的前提是你的虚拟机已经部署了 RabbitMQ 并启动.具体部署步骤请看此文章:SpringCloud系列(十)[MQ 篇] - RabbitMQ 初步学习及详细部署步骤.

RabbitMQ 五种模型

前提: 启动 RabbitMQ;

如果没有启动, 则执行指令 docker ps -a 找到容器的 ID, 然后执行 docker start [ID] 启动.
启动成功后进行登录, 如下图所示:

🐰 BasicQueue 简单队列模型

模型:

前提: 新建一个 simple.queue 队列;

消息发送:
步骤一: 在父工程 pom.xml 中引入 AMQP 依赖 (注意这里面已经包含了 RabbitMQ);

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

步骤二: 配置 RabbitMQ 地址, 在发布者(Publisher) 服务的 yml 文件中添加配置:

logging:
  pattern:
    dateformat: MM-dd HH:mm:ss:SSS
spring:
  rabbitmq:
    host: 172.16.**.**  # rabbitMQ 的 ip 地址, 也就是你的虚拟机 ip 地址
    port: 5672  # 端口号
    username: myRabbitMQ
    password: 123456 # 这里的用户名和密码是 docker 运行 RabbitMQ 容器时自己设定的, 具体指令看上篇文章
    virtual-host: /

步骤三: 在 Publisher 服务中编写测试类, 并利用 RabbitTemplate 实现消息的发送;

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest 
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendMessage2SimpleQueue() 
        String queueName = "simple.queue";
        String message = "hello, spring amqp!";
        rabbitTemplate.convertAndSend(queueName, message);
    

消息接收:
步骤一: 配置 RabbitMQ 地址, 在消费者(Consumer) 服务的 yml 文件中添加配置:

logging:
  pattern:
    dateformat: MM-dd HH:mm:ss:SSS
spring:
  rabbitmq:
    host: 172.16.**.***  # rabbitMQ 的 ip 地址
    port: 5672  # 端口
    username: myRabbitMQ
    password: 123456
    virtual-host: /
    listener:
      direct:
        prefetch: 1

步骤二: 在 consumer 服务中新建 listener 包, 并新建 SpringRabbitListener 类用来接收消息;

@Component
public class SpringRabbitListener 
    @RabbitListener(queues="simple.queue")
    public void listenSimpleQueue(String msg) 
        System.out.println("消费者接收到 simple.queue 的消息: [" + msg + "]");
    

测试:
启动 consumer 服务, 运行 publisher 中的测试代码, 进行消息的发送和接收;

🐰🐰 WorkQueue 工作消息队列模型

WorkQueue 又称 TaskQueue 任务模型, 也就是说让多个消费者绑定到一个队列来共同消费队列中的消息; 主要针对的问题是当消息处理比较耗时的时候, 可能生产消息的速度远远大于消息的消费速度, 这样队列中会堆积越来越多的消息无法及时处理, 而此模型多个消费者共同处理消息, 速度就能大大提高.
模型:

前提: 新建一个 work.queue 队列;

消息发送:
因为要解决的就是大量消息的堆积问题, 因此这里我们循环发送信息;

@Test
    public void testSendMessage2WorkQueue() throws InterruptedException 
        String queueName = "work.queue";
        String message = "hello, message --- ";
        for (int i = 0; i <= 50; i++) 
            rabbitTemplate.convertAndSend(queueName, message + i);
            Thread.sleep(20);
        
    

消息接收:
消息的接收也要模拟两个消费者共同绑定一个队列, 于是在 consumer 的 SpringRabbitListener 中添加了两个方法:

@RabbitListener(queues="work.queue")
    public void listenSimpleQueue1(String msg) throws InterruptedException 
        System.out.println("消费者11111接收到 work.queue 的消息: [" + msg + "]" + LocalTime.now());
        Thread.sleep(20);
    

    @RabbitListener(queues="work.queue")
    public void listenSimpleQueue2(String msg) throws InterruptedException 
        System.err.println("消费者22222接收到 work.queue 的消息: [" + msg + "]" + LocalTime.now());
        Thread.sleep(200);
    

测试:


通过上面的结果可以看得出来, consumer1 很快就完成了自己的消息, 而 consumer2 在缓慢的处理自己的消息; 也就是说 consumer1 和 consumer2 都是处理相同数量的消息, 但是如果考虑到消费者的处理能力, 这样显然是有问题的, 因此最好的方式就是"能者多劳", 处理能力强的 consumer 处理的数量多才对, 因此需要在 consumer 的 yml 文件中需要添加以下配置, 目的是每个 consumer 每次只能获取一个消息, 只有处理完成了才能获取下一个消息.

测试结果如下:

很明显, consumer1 的处理能力比较强, 因此处理的消息也就比 consumer2 多得多.
总之:

  • 多个消费者绑定到一个队列上, 同一条消息只会被一个消费者处理;
  • 通过在 yml 中设置 prefetch 来控制消费者预取的消息数量.

🐰🐰🐰 Fanout Exchange 扇形交换机(广播模型)


消息发送:
在广播模式下, 消息的发送流程如下:

  • 可以有多个队列, 但是多个队列都绑定到了 Exchange 交换机上;
  • 发布者发送的消息只能发送到交换机, 交换机来决定要发给哪个队列, 发布者无法决定;
  • 交换机将消息发送到绑定过的所有队列, 订阅这些队列的消费者就可以拿到消息;
  • 交换机不能缓存消息, 当路由失败时, 消息也就丢失了.

步骤一: 声明队列和交换机
在 consumer 的 config 包中新建 FanoutConfig 类, 声明两个队列和一个交换机;

@Configuration
public class FanoutConfig 
    /**
     * 声明交换机
     */
    @Bean
    public FanoutExchange fanoutExchange() 
        return new FanoutExchange("myrabbitmq.fanout");
    

    /**
     * 声明第一个队列
     */
    @Bean
    public Queue fanoutQueue1() 
        return new Queue("fanout.queue1");
    

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange) 
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    

    /**
     * 声明第二个队列
     */
    @Bean
    public Queue fanoutQueue2() 
        return new Queue("fanout.queue2");
    

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange) 
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    

步骤二: 消息发送
在 publisher 测试类中添加消息发送的测试方法;

@Test
    public void testFanoutExchange() 
        // 交换机名称
        String exchangeName = "myrabbitmq.fanout";
        String message = "您好, 我是 FanoutExchange";
        // FanoutExchange 路由键设置为空即可
        rabbitTemplate.convertAndSend(exchangeName,"",message);
    

步骤三: 消息接收

/**
     * FanoutExchange 消息接收
     */
    @RabbitListener(queues = "fanout.queue1")
    public void listenFanoutQueue1(String msg) 
        System.out.println("consumer1 接收到 Fanout 的消息: [" + msg + "]");
    
    @RabbitListener(queues = "fanout.queue2")
    public void listenFanoutQueue2(String msg) 
        System.out.println("consumer2 接收到 Fanout 的消息: [" + msg + "]");
    

测试:


在 RabbitMQ 管理网站上也可以看到我们的交换机和队列信息:

🐰🐰🐰🐰 Direct Exchange 直连交换机(路由模型)

在广播模式下我们可以知道一条消息可以被多个消费者订阅, 但是在一些场景下我们希望不同的消息被不同的队列消费, 这时候 Direct Exchange 就出现了.

**消息发送:**

在 Direct Exchange 模式下:

  • 队列与交换机的绑定不再是任意绑定了, 而是需要指定一个 路由键 (RoutingKey);
  • 发布者向 Exchange 发送消息时, 也必须指定消息的路由键;
  • 交换机不再讲消息给每一个绑定的队列, 而是根据消息的路由键进行判断, 只有队列的路由键和消息的路由键完全一致才会接收到消息.

步骤一: 在 publisher 服务的测试类中添加消息发送测试;

/**
     * DirectExchange
     */
    @Test
    public void testSendDirectExchange() 
        // 交换机名称
        String exchangeName = "myrabbitmq.direct";
        String message = "您好, 这里是 DirectExchange";

        // 发送消息
        rabbitTemplate.convertAndSend(exchangeName,"routingkey1",message);
    

消息接收:
步骤一: 声明队列和交换机 (这次我们基于注解的形式来进行声明)
在 consumer 的 SpringRabbitListener 中添加两个消费者,同时基于注解来声明队列和交换机;

/**
     * 1. 基于注解的方式声明队列和交换机
     * 2. DirectExchange 消息接收
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue1"),
            exchange = @Exchange(name = "myrabbitmq.direct", type = ExchangeTypes.DIRECT),
            key = "routingkey1","routingkey2" // 可以设置任意个路由键
    ))
    public void listenDirectQueue1(String msg) 
        System.out.println("消费者接收到了 direct.queue1 的消息: [" + msg + "]");
    
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2"),
            exchange = @Exchange(name = "myrabbitmq.direct", type = ExchangeTypes.DIRECT),
            key = "routingkey3","routingkey4" // 可以设置任意个路由键
    ))
    public void listenDirectQueue2(String msg) 
        System.out.println("消费者接收到了 direct.queue2 的消息: [" + msg + "]");
    

测试:
路由键为 routingkey1 时发送消息:

路由键为 routingkey3 时发送消息:

在 RabbitMQ 管理网站上也可以看到我们的交换机和队列信息:

绑定信息:

🐰🐰🐰🐰🐰 Topic Exchange 主题交换机(主题模型)

TopicExchange 类型的 Exchange 与 DirectExchange 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过TopicExchange 类型的 Exchange 可以让队列在绑定 Routing key 的同时使用通配符!

这里的 RoutingKey 一般都是由一个或者多个单词组成, 多个单词之间以 “.” 分割, 规则如下:

  • #: 匹配一个或多个词, 如 topic.# 能够匹配 topic.aa.bb 或者 topic.aa;
  • *: 匹配一个词, 只能匹配成 topic.aa 这种形式

消息发送:

/**
     * TopicExchange
     */
    @Test
    public void testSendTopicExchange() 
        // 交换机名称
        String exchangeName = "myrabbitmq.topic";
        String message = "您好, 这里是 TopicExchange";
        
        // 发送消息
        rabbitTemplate.convertAndSend(exchangeName,"AABBcc.key1",message);
    

消息接收:

/**
     * 1. 基于注解的方式声明队列和交换机
     * 2. TopicExchange 消息接收
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue1"),
            exchange = @Exchange(name = "myrabbitmq.topic", type = ExchangeTypes.TOPIC),
            key = "#.key1" // 所有后缀是 key1 的都可以接收到
    ))
    public void listenTopicQueue1(String msg) 
        System.out.println("消费者接收到了 topic.queue1 的消息: [" + msg + "]");
    
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue2"),
            exchange = @Exchange(name = "myrabbitmq.topic", type = ExchangeTypes.TOPIC),
            key = "key2.*" // 类似于 key2.aa  /  key2.bb 这样的可以接收到
    ))
    public void listenTopicQueue2(String msg) 
        System.out.println("消费者接收到了 topic.queue2 的消息: [" + msg + "]");
    

测试:


在 RabbitMQ 管理网站上也可以看到我们的交换机和队列信息:

以上是关于SpringCloud系列[MQ 篇] - 详述 RabbitMQ 五种模型的结构及具体实现的主要内容,如果未能解决你的问题,请参考以下文章

重学SpringCloud系列八之微服务网关安全认证-JWT篇

Java面试题超详细讲解微服务系列之十六SpringCloud篇

MQ系列11:如何保证消息可靠性传输(除夕奉上)

SpringCloud Alibaba系列Dubbo高级特性篇

SpringCloud Alibaba系列Dubbo基础入门篇

ESP 保姆级教程 预告疯狂Node.js服务器篇 ——案例:ESP8266 + MQ系列 + NodeJs本地服务 + 文件存储数据