11.RabbitMQ单机集群

Posted 奋斗的一线码农

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了11.RabbitMQ单机集群相关的知识,希望对你有一定的参考价值。

RabbitMQ集群设计用于完成两个目标:允许消费者和生产者在RabbitMQ节点崩溃的情况下继续运行,以及通过添加更多的节点来扩展消息通信的吞吐量。

RabbitMQ会始终记录以下四种类型的内部元数据:

1.         队列元数据-队列的名称和它们的属性(是否持久化,是否自动删除)

2.         交换器元数据-交换器类型、名称和属性(可持久化等)

3.         绑定元数据-一张简单的表格展示了如何将消息路由到队列

4.         vhost元数据-为vhost内的队列、交换器和绑定提供命名空间和安全属性

 在单一节点内,RabbitMQ会将所有这些信息存储在内存中,同时将那些标记为可持久化的队列和交换器(以及它们的绑定)存储到硬盘上。当你引入集群时,RabbitMQ需要追踪新的元数据类型:集群节点位置,以及节点与已记录的其他类型元数据的关系。集群提供了选择:将元数据存储到磁盘上,或者存储在内存中。

Erlang Cookie

Erlang Cookie是保证不同节点可以相互通信的密钥,要保证集群中的不同节点相互通信必须共享相同的Erlang Cookie。具体的目录存放在/var/lib/rabbitmq/.erlang.cookie。

说明: 这就要从rabbitmqctl命令的工作原理说起,RabbitMQ底层是通过Erlang架构来实现的,所以rabbitmqctl会启动Erlang节点,并基于Erlang节点来使用Erlang系统连接RabbitMQ节点,在连接过程中需要正确的Erlang Cookie和节点名称,Erlang节点通过交换Erlang Cookie以获得认证。

镜像队列

功能和原理 
RabbitMQ的Cluster集群模式一般分为两种,普通模式和镜像模式。

  • 普通模式:默认的集群模式,以两个节点(rabbit01、rabbit02)为例来进行说明。对于Queue来说,消息实体只存在于其中一个节点rabbit01(或者rabbit02),rabbit01和rabbit02两个节点仅有相同的元数据,即队列的结构。当消息进入rabbit01节点的Queue后,consumer从rabbit02节点消费时,RabbitMQ会临时在rabbit01、rabbit02间进行消息传输,把A中的消息实体取出并经过B发送给consumer。所以consumer应尽量连接每一个节点,从中取消息。即对于同一个逻辑队列,要在多个节点建立物理Queue。否则无论consumer连rabbit01或rabbit02,出口总在rabbit01,会产生瓶颈。当rabbit01节点故障后,rabbit02节点无法取到rabbit01节点中还未消费的消息实体。如果做了消息持久化,那么得等rabbit01节点恢复,然后才可被消费;如果没有持久化的话,就会产生消息丢失的现象。

  • 镜像模式:将需要消费的队列变为镜像队列,存在于多个节点,这样就可以实现RabbitMQ的HA高可用性。作用就是消息实体会主动在镜像节点之间实现同步,而不是像普通模式那样,在consumer消费数据时临时读取。缺点就是,集群内部的同步通讯会占用大量的网络带宽。

 

内存节点和磁盘节点

每个RabbitMQ节点,要么是内存节点(ram node),要么是磁盘节点(disk node)。内存节点将所有的队列、交换器、绑定、用户、权限和vhost的元数据定义都仅存在内存中。而磁盘节点则将元数据存储在磁盘中。

 内存节点的效率更高,内存节点唯一存储到磁盘上的是磁盘节点的地址。

 

RabbitMQ要求集群中至少有一个磁盘节点。当节点加入或者离开集群时,它们必须要将该变更通知到至少一个磁盘节点。如果只有一个磁盘节点,而且不凑巧的是它又崩溃了,那么集群可以继续路由消息,但是不能做以下操作了:

1.         创建队列

2.         创建交换器

3.         创建绑定

4.         添加用户

 

5.         更改权限

 

单机环境搭建多节点群集

1、禁用管理后台插件rabbitmq-plugins disable rabbitmq_management

2、创建三个Shell文件


rabbitmq1.sh

#!/bin/bash

export RABBITMQ_NODE_PORT=5672

export RABBITMQ_NODENAME=rabbit 

 

rabbitmq-server

 

rabbitmq2.sh

#!/bin/bash

export RABBITMQ_NODE_PORT=5673

export RABBITMQ_NODENAME=rabbit2

 

rabbitmq-server

 

rabbitmq3.sh

#!/bin/bash

export RABBITMQ_NODE_PORT=5674

export RABBITMQ_NODENAME=rabbit3

 

rabbitmq-server

 

3、停止在Erlang节点上运行的节点2和节点3 RabbitMQ Server 并清空(重置)它们的元数据

rabbitmqctl -n [email protected] stop_app

rabbitmqctl -n [email protected] stop_app

 

rabbitmqctl -n [email protected] reset

rabbitmqctl -n [email protected] reset

 

4、将节点2作为磁盘节点加入集群并启动应用

rabbitmqctl -n [email protected] join_cluster [email protected]

rabbitmqctl -n [email protected] start_app

 

5、将节点3作为内存节点加入集群并启动应用

rabbitmqctl -n [email protected] join_cluster --ram [email protected]

rabbitmqctl -n [email protected] start_app

 

6、运行命令rabbitmqctl cluster_status查看集群状态

Cluster status of node [email protected] ...

[{nodes,[{disc,[[email protected],[email protected]]},

         {ram,[[email protected]]}]},

 {running_nodes,[[email protected],[email protected],[email protected]]},

 {cluster_name,<<"[email protected]">>},

 {partitions,[]},

 {alarms,[{[email protected],[]},

          {[email protected],[]},

          {[email protected],[]}]}]

 

 

集群安装成功,这时候java客户端可以连接任何一个RabbitMQ Server的端口来访问集群了。

 

7、镜像队列

在声明队列时,可以通过参数"x-ha-policy"设置为"all"来把消息发送到集群的所有节点上。

Map arg = new HashMap();

arg.put("x-ha-policy", "all");

channel.queueDeclare(queueName, false, false, false, arg);

 

客户端发送代码

package com.test.cluster;

 

import com.rabbitmq.client.*;

 

import java.io.IOException;

import java.lang.String;

import java.lang.System;

import java.util.HashMap;

import java.util.Map;

import java.util.Scanner;

 

public class Producer {

 

    public static void main(String[] args) throws Exception {

   

    //使用默认端口连接MQ

        ConnectionFactory factory = new ConnectionFactory();

    factory.setUsername("admin");

    factory.setPassword("admin");

        factory.setHost("192.168.169.142"); //使用默认端口5672

        Connection conn = factory.newConnection(); //声明一个连接

        Channel channel = conn.createChannel(); //声明消息通道

   

        String exchangeName = "TestEXG";//交换机名称

        String routingKey = "RouteKey1";//RoutingKey关键字

        channel.exchangeDeclare(exchangeName, "direct", true);//定义声明交换机

        String queueName = "ClusterQueue";//队列名称

        Map arg = new HashMap();

        arg.put("x-ha-policy", "all");

        channel.queueDeclare(queueName, false, false, false, arg);

 

        channel.queueBind(queueName, exchangeName, routingKey);//定义声明对象

        

        byte[] messageBodyBytes = "Hello, world!".getBytes();//消息内容

        channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);//发布消息

        //关闭通道和连接

channel.close();

conn.close();

    }

 

 

}

 

消费者代码

package com.test.cluster;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.QueueingConsumer;

 

import java.io.IOException;

import java.util.HashMap;

import java.util.Map;

 

//通过channel.basicAck向服务器发送回执,删除服务上的消息

public class Consumer {

 

    public static void main(String[] args) throws IOException, InterruptedException {

        ConnectionFactory factory = new ConnectionFactory();

    factory.setUsername("admin");

    factory.setPassword("admin");

        factory.setHost("192.168.169.142"); //使用默认端口5672

        factory.setPort(5672);

        Connection conn = factory.newConnection(); //声明一个连接

        Channel channel = conn.createChannel(); //声明消息通道

        String exchangeName = "TestEXG";//交换机名称

        String queueName = "ClusterQueue";//队列名称

        channel.exchangeDeclare(exchangeName, "direct", true);//定义声明交换机

        channel.queueBind(queueName, exchangeName, "RouteKey1");

 

        channel.basicQos(1); //server push消息时的队列长度

 

        //用来缓存服务器推送过来的消息

        QueueingConsumer consumer = new QueueingConsumer(channel);

 

        channel.basicConsume(queueName, false, consumer);

 

        while (true) {

            QueueingConsumer.Delivery delivery = consumer.nextDelivery();

            System.out.println("Received " + new String(delivery.getBody()));

 

            //回复ack包,如果不回复,消息不会在服务器删除

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

        }

    }

 

}

以上是关于11.RabbitMQ单机集群的主要内容,如果未能解决你的问题,请参考以下文章

rabbitmq单机和集群部署

rabbitmq单机和集群部署

RabbitMQ安装以及集群部署

RabbitMQ入门教程(十四):RabbitMQ单机集群搭建

Rabbitmq 相关介绍之单机集群配置

在windows环境中单机搭建rabbitmq集群