配置两台Linux CentOS 6.7虚拟主机
CentOS6.7下载地址
https://pan.baidu.com/s/1i5GPg9n
安装视频下载
https://pan.baidu.com/s/1qYSgohQ
rabbitmq2
rabbitmq1
1、分别在两台主机上修改/etc/hosts
192.168.169.100 rabbitmq1
192.168.169.110 rabbitmq2
2、从客户端上传RPM包
https://pan.baidu.com/s/1dE1iaGx
3、下载阿里云Yum源
#wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-6.repo
依次安装Erlang,Rabbitmq
#yum
-y install openssl
#yum -y install socat-1.7.2.4-1.el6.rf.x86_64.rpm
#yum -y install erlang-17.4-1.el6.x86_64.rpm
#yum -y install rabbitmq-server-3.6.3-1.noarch.rpm
4、启动rabbitmq1,rabbitmq2上的RabbitMQ
rabbitmq1
#service rabbitmq-server start
rabbitmq2
#service rabbitmq-server start
5、从rabbitmq1主机上拷贝文件到rabbitmq2
scp /var/lib/rabbitmq/.erlang.cookie rabbitmq2:/var/lib/rabbitmq
6、在rabbitmq1,rabbitmq2上分别关闭防火墙
[[email protected] ~]#
service iptables stop
[[email protected] ~]#
service iptables stop
7、在rabbitmq1,rabbitmq2上分别启动RibbitMQ
[[email protected] ~]# service rabbitmq-server
start
[[email protected] ~]# service rabbitmq-server
start
8、在rabbitmq2上执行
rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@rabbitmq1
rabbitmqctl start_app
9、查看各节点上的状态
rabbitmqctl cluster_status
10、在rabbitmq1,rabbitmq2节点上分别添加用户和设置控制台插件
[[email protected] ~]# rabbitmq-plugins
enable rabbitmq_management
[[email protected] ~]# rabbitmqctl add_user
admin admin
[[email protected] ~]# rabbitmqctl
set_permissions admin ".*" ".*" ".*"
[[email protected] ~]# rabbitmqctl
set_user_tags admin administrator
11、在rabbitmq1节点上安装haproxy
yum -y install
haproxy
12、配置haproxy
cp /etc/haproxy/haproxy.cfg /etc/haproxy/haproxy.cfg.bak
vi /etc/haproxy/haproxy.cfg
添加配置信息
listen rabbitmq_local_cluster
192.168.169.100:5670
mode
tcp
balance
roundrobin
server
rabbit 192.168.169.100:5672 check inter 5000 rise 2 fall
3
server
rabbit 192.168.169.110:5672 check inter 5000 rise 2 fall
3
listen private_monitoring :8100
mode
http
option
httplog
stats
enable
stats
uri /stats
stats
refresh 60s
13、启动haproxy
service haproxy start
14、查看haproxy控制台
http://192.168.169.142:8100/stats
15、建立RabbitMQ策略
16、建立持久队列
测试代码
Producer.java
package com.test.cluster;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.lang.String;
import java.lang.System;
import java.util.HashMap;
import java.util.Map;
import java.util.Scanner;
public class Producer {
public static void
main(String[] args) throws Exception {
//使用默认端口连接MQ
ConnectionFactory factory = new
ConnectionFactory();
factory.setUsername("admin");
factory.setPassword("admin");
factory.setHost("192.168.169.100");
//使用默认端口5672
factory.setPort(5670);
Connection conn = factory.newConnection();
//声明一个连接
Channel channel = conn.createChannel();
//声明消息通道
String exchangeName = "TestEXG";//交换机名称
String routingKey =
"RouteKey1";//RoutingKey关键字
channel.exchangeDeclare(exchangeName, "direct",
true);//定义声明交换机
String queueName = "ClusterQueue";//队列名称
Map arg = new HashMap();
arg.put("x-ha-policy", "all");
channel.queueDeclare(queueName, false, false,
false, arg);
channel.queueBind(queueName, exchangeName,
routingKey);//定义声明对象
byte[] messageBodyBytes = "Hello,
world!".getBytes();//消息内容
channel.basicPublish(exchangeName, routingKey,
null, messageBodyBytes);//发布消息
//关闭通道和连接
channel.close();
conn.close();
}
}
Customer.java
package com.test.cluster;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
//通过channel.basicAck向服务器发送回执,删除服务上的消息
public class Consumer {
public static void
main(String[] args) throws IOException, InterruptedException
{
ConnectionFactory factory = new
ConnectionFactory();
factory.setUsername("admin");
factory.setPassword("admin");
factory.setHost("192.168.169.100");
//使用默认端口5672
factory.setPort(5670);
Connection conn = factory.newConnection();
//声明一个连接
Channel channel = conn.createChannel();
//声明消息通道
String exchangeName = "TestEXG";//交换机名称
String queueName = "ClusterQueue";//队列名称
channel.exchangeDeclare(exchangeName, "direct",
true);//定义声明交换机
channel.queueBind(queueName, exchangeName,
"RouteKey1");
channel.basicQos(1); //server push消息时的队列长度
//用来缓存服务器推送过来的消息
QueueingConsumer consumer = new
QueueingConsumer(channel);
channel.basicConsume(queueName, false,
consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
System.out.println("Received " + new
String(delivery.getBody()));
//回复ack包,如果不回复,消息不会在服务器删除
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),
false);
}
}
}
关闭掉其中一个RabbitMQ,测试群集效果