RabbitMQ之Federation ExchangeFederation QueueShovel

Posted 别团等shy哥发育

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ之Federation ExchangeFederation QueueShovel相关的知识,希望对你有一定的参考价值。

1、Federation Exchange(联邦交换机)

1.1 为什么使用联邦交换机

  (broker 北京),(broker 深圳)彼此之间相距甚远,网络延迟是一个不得不面对的问题。有一个在北京 的业务(Client 北京) 需要连接(broker 北京),向其中的交换器 exchangeA 发送消息,此时的网络延迟很小, (Client 北京)可以迅速将消息发送至 exchangeA 中,就算在开启了 publisherconfirm 机制或者事务机制的 情况下,也可以迅速收到确认信息。此时又有个在深圳的业务(Client 深圳)需要向 exchangeA 发送消息, 那么(Client 深圳) (broker 北京)之间有很大的网络延迟,(Client 深圳) 将发送消息至 exchangeA 会经历一 定的延迟,尤其是在开启了 publisherconfirm 机制或者事务机制的情况下,(Client 深圳) 会等待很长的延 迟时间来接收(broker 北京)的确认信息,进而必然造成这条发送线程的性能降低,甚至造成一定程度上的 阻塞。

  将业务(Client 深圳)部署到北京的机房可以解决这个问题,但是如果(Client 深圳)调用的另些服务都部 署在深圳,那么又会引发新的时延问题,总不见得将所有业务全部部署在一个机房,那么容灾又何以实现? 这里使用 Federation 插件就可以很好地解决这个问题.

1.2 搭建步骤

1.2.1 需要保证每台节点单独运行

1.2.2 在每台机器上开启federation相关插件

rabbitmq-plugins enable rabbitmq_federation
rabbitmq-plugins enable rabbitmq_federation_managemen

装完之后看下管理界面

1.2.3 原理图(先运行consumer在node2创建fed_exchange)

消费者代码:

public class Consumer 

    //队列的名称
    public static final String QUEUE_NAME="mirrior_hello";
    //交换机的名称
    public static final String FED_EXCHANGE="fed_exchange";
    //接收消息
    public static void main(String[] args) throws IOException, TimeoutException 
        //创建连接工厂
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("192.168.159.34");
        factory.setUsername("admin");
        factory.setPassword("123");
        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

        channel.exchangeDeclare(FED_EXCHANGE,BuiltinExchangeType.DIRECT);
        channel.queueDeclare("node2_queue",true,false,false,null);
        channel.queueBind("node2_queue",FED_EXCHANGE,"routeKey");

        //声明 接收消息
        DeliverCallback deliverCallback=(consumerTag, message)->
            System.out.println(new String(message.getBody()));
        ;
        //取消消息时的回调
        CancelCallback cancelCallback=consumerTag -> 
            System.out.println("消息消费被中断");
        ;


        /**
         * 消费者消费消息
         * 1.消费哪个队列
         * 2.消费成功之后是否要自动应答 true代表自动应答 false手动应答
         * 3.消费者成功消费的回调
         * 4.消费者取消消费的回调
         */
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);

    

1.2.4 在downstream(node2)配置upstream(node1)

1.2.5 添加policy

查看下是否搭建成功,点击Federation Status

2、Federation Queue(联邦队列)

2.1 为什么使用联邦队列

  联邦队列可以在多个 Broker 节点(或者集群)之间为单个队列提供均衡负载的功能。一个联邦队列可以 连接一个或者多个上游队列(upstream queue),并从这些上游队列中获取消息以满足本地消费者消费消息 的需求。

2.2 搭建步骤

2.2.1 原理图

2.2.2 添加upstream

这一步和1.2.4一致

2.2.3 添加policy

3、Shovel

3.1 为什么使用Shovel

  Federation 具备的数据转发功能类似,Shovel 够可靠、持续地从一个 Broker 中的队列(作为源端,即 source)拉取数据并转发至另一个 Broker 中的交换器(作为目的端,即 destination)。作为源端的队列和作 为目的端的交换器可以同时位于同一个 Broker,也可以位于不同的 Broker 上。Shovel 可以翻译为"铲子", 是一种比较形象的比喻,这个"铲子"可以将消息从一方"铲子"另一方。Shovel 行为就像优秀的客户端应用 程序能够负责连接源和目的地、负责消息的读写及负责连接失败问题的处理。

3.2 搭建步骤

3.2.1 开启的插件(需要的机器都开启)

rabbitmq-plugins enable rabbitmq_shovel

rabbitmq-plugins enable rabbitmq_shovel_management

3.2.2 原理图(在源头发送的消息直接会进入到目的地队列)

3.2.3 添加shovel源和目的地

检查下是否搭建成功

以上是关于RabbitMQ之Federation ExchangeFederation QueueShovel的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ集群搭建镜像队列实现高可用负载均衡Federation ExchangeFederation QueueShovel

消息队列之RabbitMQ-分布式部署

RabbitMQ集群架构之使用Haproxy实现高可用负载均衡

RabbitMQ 消息的参数详解

如何测试 rabbitmq 的性能

MQ 实现信息延迟投递