RabbitMQ 消息队列学习

Posted IT_Holmes

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ 消息队列学习相关的知识,希望对你有一定的参考价值。

文章目录

1. 备份交换机


添加参数给交换机联系上备份交换机:

ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME)
                    .durable(true) //持久化
                    .withArgument("alternate-exchange",BACKUP_EXCHANGE_NAME)//
                    .build()

配置类:

package com.itholmes.shopping.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 发布确认高级  备份交换机
 */
@Configuration
public class ConfirmConfig 

    //交换机
    public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
    //队列
    public static final String CONFIRM_QUEUE_NAME = "confirm_queue";
    //RoutingKey
    public static final String CONFIRM_ROUTING_KEY = "key1";
    /**
     * 备份交换机
     */
    public static final String BACKUP_EXCHANGE_NAME = "backup_exchange";
    /**
     * 备份队列
     */
    public static final String BACKUP_QUEUE_NAME = "backup_queue";
    /**
     * 报警队列
     */
    public static final String WARNING_QUEUE_NAME = "warning_queue";

    /**
     * 声明交换机
     *      这里需要将该交换机与备份交换机联系起来。
     *  如果rabbitmq服务里面有了该交换机,现在已经该了配置,就要去rabbitmq服务删除该队列,重新创建。
     */
    @Bean("confirmExchange")
    public DirectExchange confirmExchange()
        return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME)
                    .durable(true) //持久化
                    .withArgument("alternate-exchange",BACKUP_EXCHANGE_NAME)//
                    .build();
    
    //声明队列
    @Bean("confirmQueue")
    public Queue confirmQueue()
        return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    
    //绑定
    @Bean
    public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue confirmQueue,@Qualifier("confirmExchange") DirectExchange confirmExchange)
        return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
    

    /**
     * 声明备份交换机
     */
    @Bean("backupExchange")
    public FanoutExchange backupExchange()
        return new FanoutExchange(BACKUP_EXCHANGE_NAME);
    
    /**
     * 声明备份队列
     */
    @Bean("backupQueue")
    public Queue backupQueue()
        return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
    
    /**
     * 声明报警队列
     */
    @Bean("warningQueue")
    public Queue warningQueue()
        return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
    

    /**
     * 备份队列的绑定
     */
    @Bean
    public Binding backupQueueBindingBackupExchange(@Qualifier("backupQueue") Queue backupQueue,@Qualifier("backupExchange") FanoutExchange backupExchange)
        return BindingBuilder.bind(backupQueue).to(backupExchange);
    

    /**
     * 报警队列的绑定
     */
    @Bean
    public Binding warningQueueBindingBackupExchange(@Qualifier("warningQueue") Queue warningQueue,@Qualifier("backupExchange") FanoutExchange backupExchange)
        return BindingBuilder.bind(warningQueue).to(backupExchange);
    


报警消费者:

package com.itholmes.shopping.consumer;

import com.itholmes.shopping.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 报警消费者
 */
@Slf4j
@Component
public class WarningConsumer 
    //接受报警消息
    @RabbitListener(queues = ConfirmConfig.WARNING_QUEUE_NAME)
    public void receiveWarningMsg(Message message)
        String msg = new String(message.getBody());
        log.error("报警发现不可路由消息:",msg);
    

生产者:

  • 发送一个有可用路由(routingKey),在发送一个不可用路由(routingKey)来测试判断。
package com.itholmes.shopping.controller;

import com.itholmes.shopping.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

/**
 * 发布确认高级:生产者 开始发消息 测试确认
 */
@Slf4j
@RestController
public class ProducerController 

    @Autowired
    RabbitTemplate rabbitTemplate;

    //发消息
    @GetMapping("/sendMessage/message")
    public void sendMessage(@PathVariable String message)

        //对应回调接口的correlationData
        CorrelationData correlationData = new CorrelationData("1");

        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
                ConfirmConfig.CONFIRM_ROUTING_KEY,message,correlationData);

        log.info("发送消息为:",message+"key1");


        //模拟队列错误,专门绑定一个不存在的routingkey
        CorrelationData correlationData2 = new CorrelationData("2");

        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
                ConfirmConfig.CONFIRM_ROUTING_KEY+"2",message,correlationData2);

        log.info("发送消息为:",message+"key2");

    


注意:

  • 当回退消息和备份交换机同时配置,备份交换机优先级高。

2. Rabbitmq的 幂等性


幂等性就是:

  • 就是用户对同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。

解决办法:

  • 方法一:唯一ID+指纹码机制,利用数据库主键去重。
  • 方法二:利用redis的原子性去实现。(最佳策略)

方法一:唯一ID+指纹码机制,利用数据库主键去重。

  • 使用时间戳、UUID、或者mq的id来判断,每次消费消息时,用该唯一标识,判断该消息是否已经消费过。
  • 指纹码介绍如下:

方法二:利用redis的原子性去实现。(最佳策略)

3. rabbitmq的 优先级队列

3.1 优先级队列的 原理


使用场景:

  • 订单催付场景。

优先级队列的范围是0~255 , 越大越优先执行。

3.2 优先级队列 代码实现


页面配置最大优先级:


两个过程:给队列设置最大优先级,给消息设置优先级(不能超过最大优先级)。

代码实现:

  • 在声明队列的时候,添加map类型参数,设置该队列最大优先级。

  • 生产者消息添加优先级,配置Properties。

注意:

  • 测试时,最好先把消费者停止了,这样生产者发送的消息才会在队列中堆积起来,之后启动消费者就能看到测试效果了。

4. rabbitmq的 惰性队列


惰性队列:将消息保存在磁盘中。

正常情况:消息是保存在内存中。

惰性队列,先将消息从磁盘读取到内存,然后在发送给消费者,所以相比较而言,很慢。

但是也有惰性队列的使用场景:

  • 当消费者由于各种各样的原因,例如:消费者下线,宕机亦或者是由于维护而关闭,从致使长时间内不能消费消息造成堆积时(也就是队列堆积很多了消息),这种情况就用到了惰性队列。

队列的两种模式:

  • default模式,正常模式 。
  • lazy模式,惰性模式。

页面添加惰性队列:

代码声明惰性队列:


惰性队列和正常队列的内存开销对比:

  • 正常队列快,惰性队列慢。
  • 内存开销,如下图,在消息庞大的时候,惰性队列的内存开销小很多!

5. rabbitmq的 集群


下图就是以第一个node1来主体,然后将node2和node3加入到node1中。

  • 当然,也可以在node2,node3下面也可以添加mq节点,无论访问那个mq节点,都是相当于访问的这个集群。

首先,准备三台机器,每台机器安装好rabbitmq。

第一步:将他们都配置好vim /etc/hosts 文件的IP和地址名。

第二步:确保各个节点的cookie文件使用的是同一个值。


第三步:启动rabbitmq服务,三台机器都启动。

第四步:将node2和node3与node1联系起来。

第五步:查看集群状态。

第六步:为该集群创建账户。因为之前执行了rabbitmq的reset命令,所有的信息都被重置了,之前的用户也就没有了。


解除集群节点:

6. rabbitmq的 镜像队列


我们在集群里面创建队列,在node1节点创建队列,就在node1节点里面,并不会存在node2,node3中。

这样如果node1宕机了,该队列和队列里面的消息就没了。


这样就用到了镜像队列,将node1的队列复制一份到node2或者node3中。

这时候我们再次创建新队列,名称必须符合匹配规则。

该队列就会备份。在那个node节点备份,由服务器决定。

这样当正常队列出现问题后,镜像队列就会立刻补充上,达到备份效果。

6. Haproxy实现高可用负载均衡


之前所配置的代码,仅仅是连接了一个IP地址的node节点,如果该IP节点宕机了,那么代码就连接不到rabbitmq了。


Haproxy的使用原理图:

  • 该方案适合当生产者无法实现连接多台IP服务器的场景。
  • 当前rabbitmq集群有多台服务器,但是生产者只能访问其中一个节点,所以可以使用该方案进行架构。

7. Federation Exchange 联邦交换机

7.1 Federation Exchange 准备


实现上面的效果就可以通过Federation Exchange 联邦交换机实现。


开启federation的插件。

安装完成后,服务页面就多了如下部分:

北京broker同步给深圳,那么北京就是上流,深圳是下流。

7.2 联邦交换机的原理


联邦交换机的原理图

7.3 联邦交换机的 实现


按照原理图实现:


第一步:先构建node2节点的downstream下流:

  • 在节点node2,创建一个fed_exchange交换机。
  • 创建node2_queue,绑定routeKey。

第二步:创建联邦策略。根据pattern正则匹配对应下流交换机。


第三步:在rabbitmq页面,创建一个上游。node1节点为上游,根据联邦策略判断谁为下游。


第四步:查看status,检查是否创建成功。

8. Federation queue 联邦队列


同样一个联邦队列可以连接一个或者多个上游队列。


原理图:


和联邦交换机一样,先创建一个联邦策略:

其他过程也同样。

9. Shovel(英文直译:铲子)

9.1 Shovel 准备


Shovel行为就像优秀的客户端应用程序能够负责连接源端和目的地、负责消息的读写及负责连接失败问题的处理。

其实Shovel的感觉和上下流差不多。

原理图:

  • Q1是源端,Q2是目的地。

rabbitmq的开启插件:

(这样Web页面就有了Shovel Status ,Shovel Management的功能。)

9.1 Shovel 的实现


点击Shovel Management添加一个Shovel:

查看添加的Shovel状态:

这样就实现了一个Shovel的效果图。

以上是关于RabbitMQ 消息队列学习的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ之消息可靠性死信交换机惰性队列及集群

RabbitMQ之消息可靠性死信交换机惰性队列及集群

Rabbitmq之发布确认高级回退消息备份交换机幂等性优先级队列惰性队列

RabbitMQ 镜像队列和交换

RabbitMQ-消息堆积&高可用

RabbitMQ学习(下)——发布确认高级幂等性优先级惰性和RabbitMQ集群