部署:单主部署一主一从部署双主双从部署代码测试
Posted 鮀城小帅
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了部署:单主部署一主一从部署双主双从部署代码测试相关的知识,希望对你有一定的参考价值。
一.RocketMQ单机部署
Hosts添加信息
我们首先进入/etc/hosts来添加信息
vim /etc/hosts
添加信息如下:
上传解压安装包
通过XFTP工具将apache-rocketmq.tar.gz传到/usr/local/software,然后添加/usr/local/apache-rocketmq目录,再将apache-rocketmq.tar.gz解压到此目录下
[root@rich apache-rocketmq]# mkdir /usr/local/apache-rocketmq && tar -zxvf apache-rocketmq.tar.gz -C /usr/local/apache-rocketmq
再建立一个软连接
[root@rich local]# ln -s apache-rocketmq rocketmq
创建存储路径
[root@rich local]# mkdir /usr/local/rocketmq/store
[root@rich local]# mkdir /usr/local/rocketmq/store/commitlog
[root@rich local]# mkdir /usr/local/rocketmq/store/consumequeue
[root@rich local]# mkdir /usr/local/rocketmq/store/index
更改RocketMQ配置文件
vim /usr/local/rocketmq/conf/2m-2s-async/broker-a.properties
broker-a.properties(主启动文件)更改成如下:
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker 名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master, >0 表示 Slave
brokerId=0
#nameServer 地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876
#在发送消息时,自动创建服务器不存在的 topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4 点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog 每个文件的大小默认 1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制 Master
#- SYNC_MASTER 同步双写 Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
修改日志配置文件
我们去到 usr/local目录下
[root@rocketmq-nameserver1 local]# mkdir -p /usr/local/rocketmq/logs
[root@rocketmq-nameserver1 local]# cd /usr/local/rocketmq/conf && sed -i 's#${user.home}#/usr/local/rocketmq#g' *.xml
修改启动脚本的参数
[root@rocketmq-nameserver1 bin]# vim /usr/local/rocketmq/bin/runbroker.sh
[root@rocketmq-nameserver1 bin]# vim /usr/local/rocketmq/bin/runserver.sh
将两个文件里面的内存配置都改为1G,至少1G,不然启动会失败。
启动RocketMQ
启动顺序是先启动NameServer,再启动BrokerServer
首先我们进入/usr/local/rocketmq/bin目录下输入下面命令启动NameServer
[root@rocketmq-nameserver1 bin]# nohup sh mqnamesrv >/dev/null 2>&1 &
然后输入下面命令启动BrokerServer
[root@rocketmq-nameserver1 bin]# nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-async/broker-a.properties >/dev/null 2>&1 &
再输入jps,可以看到BrokerServer启动了
关闭RocketMQ
如果我们要关闭服务的话先关BrokerServer,再关闭NamesrvStartup,进入/usr/local/rocketmq/bin目录下输入下面命令:
# sh mqshutdown broker
# sh mqshutdown namesrv
二.单主从模式集群环境构建
我们现在要新增一个Slave节点,我用的是阿里云虚拟机,然后我们先去本地虚拟机把阿里云的节点添加到etc/hosts文件中(在集群中最好显示声明brokerIP1这一个参数,不只是在阿里云服务器,本地虚拟机也需要这么做,不然无法实现从服务的nameserver1读取broker信息)
#0 表示 Master, >0 表示 Slave
brokerId=1
#nameServer 地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
brokerIP1=192.168.113.115
#- ASYNC_MASTER 异步复制 Master
#- SYNC_MASTER 同步双写 Master
#- SLAVE
brokerRole=SLAVE
本地虚拟机的broker-a.properties(主启动文件)文件配置需要将更改如下:
#nameServer 地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
最后用broker-a.properties启动本地虚拟机的节点,使用broker-a-s.properties启动阿里云虚拟机的节点,然后通过控制台来看结果:
说明成功啦!
问题:如果rocketmq-console没有显示并且报错,有如下方案:
(1)进入页面报错提示: org.apache.rocketmq.remoting.exception.RemotingConnectException: connection to ip : 10911 failed,
原因如下:出现这个错误是由于没有配置外网地址,,特别是使用云服务器的话,必须要配置外网地址,而且要开放端口 9876 ,10911,10909 这三个端口。
验证问题:使用telnet ip:port 来测试该主机的port是否能够正常访问。
解决方案:
- 让防火墙为该端口放行。执行firewall-cmd --permanent --zone=public --add-port=3306(port)/tcp
- 关闭防火墙。执行 systemctl stop firewalld
三.双主双从集群环境搭建
现在我们来讨论双主双从,两台机器分别部署Broker-Master&NameServer,两台机器分别部署Broker-Slave&NameServe
上图图中,把NameServer单独提出来了,变成一个共有的服务组,但有时候为了节省服务器,我们可以把NameServer与BrokerMater和BrokerSlave部署到同一个节点就。比如两台机器部署master,两台机器部署slave,然后每台机器上都有一个NameServer,这也是我们的部署方案。
我们首先需要四个节点:master1 、master2、slave1、slave2.
master1与slave1是一对主从节点,master2与slave2是一对主从节点,每个节点有一个NameServer。
多主多从模式分为两种方式,第一种为异步复制,第二种为同步双写.
双主模式,文件夹配置为: conf/2m-noslave/
多主多从模式(异步复制),文件夹配置为: conf/2m-2s-async/
多主多从模式(同步双写),文件夹配置为:conf/2m-2s-sync/
我们为了可靠性,选择双写双从,如果想提高性能,可以在后面的刷盘策略上下功夫。关于双写双从的配置我们后面讲。
四个节点的配置和上面一样,只有下面两个操作略有不同
第一个是etc/hosts文件配置。如下
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.133.115 rocketmq-nameserver1
192.168.133.115 rocketmq-master1
192.168.133.116 rocketmq-nameserver2
192.168.133.116 rocketmq-master1-slave
192.168.133.117 rocketmq-nameserver3
192.168.133.117 rocketmq-master2
192.168.133.118 rocketmq-nameserver4
192.168.133.118 rocketmq-master2-slave
接下来我们再改同步双写的配置文件
broker-a.properties、 broker-b.properties 配置如下:
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker 名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a|broker-b
#0 表示 Master, >0 表示 Slave
brokerId=0
#nameServer 地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876;rocketmq-nameserver3:9876;rocketmq-nameserver4:9876
#在发送消息时,自动创建服务器不存在的 topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4 点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog 每个文件的大小默认 1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制 Master
#- SYNC_MASTER 同步双写 Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
broker-a-s.properties、 broker-b-s.properties 配置如下:
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker 名字,注意此处不同的配置文件填写的不一样,与 Master 通过 brokerName 来配对
brokerName=broker-a|broker-b
#0 表示 Master, >0 表示 Slave
brokerId=1
#nameServer 地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876;rocketmq-nameserver3:9876;rocketmq-nameserver4:9876
#在发送消息时,自动创建服务器不存在的 topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4 点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog 每个文件的大小默认 1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制 Master
#- SYNC_MASTER 同步双写 Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
四、测试代码
生产者——同步发送消息
public class SyncProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
DefaultMQProducer producer = new
DefaultMQProducer("please_rename_unique_group_name");
// Specify name server addresses.
producer.setNamesrvAddr("192.168.133.115:9876");
//Launch the instance.
producer.start();
for (int i = 0; i < 100; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
}
消费消息
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// Instantiate with specified consumer group name.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
// Specify name server addresses.
consumer.setNamesrvAddr("192.168.133.116:9876");
// Subscribe one more more topics to consume.
consumer.subscribe("TopicTest", "*");
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
byte[] bt = msgs.get(0).getBody();
try {
System.out.println("-------------" + new String(bt,"UTF-8")+ "------------");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//Launch the consumer instance.
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
单机与一主一从、多主多从的代码基本上一致,只是消费者所指定的 namesvrAddr 不一定是生产的那台服务所在的 namesvrAddr ,但是经过测试,生产者在 192.168.133.115 机器生产消息,消费者指定的是 192.168.133.116 机器的namesvrAddr ,同样消费成功了。
这一步验证了前面在讲解原理时说过的,消费者主动从 namesvrAddr 拉取broker消息,根据指定的主题“TopicTest” 去对应的 broker 上拉取消息进行消费。这里是主从节点,所以直接在 192.168.133.116 机器上已经同步完成了数据,所以消费者可以直接在该机器上进行消费消息。
以上是关于部署:单主部署一主一从部署双主双从部署代码测试的主要内容,如果未能解决你的问题,请参考以下文章