部署:单主部署一主一从部署双主双从部署代码测试

Posted 鮀城小帅

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了部署:单主部署一主一从部署双主双从部署代码测试相关的知识,希望对你有一定的参考价值。

一.RocketMQ单机部署

Hosts添加信息

我们首先进入/etc/hosts来添加信息

vim /etc/hosts

添加信息如下:

 上传解压安装包

 通过XFTP工具将apache-rocketmq.tar.gz传到/usr/local/software,然后添加/usr/local/apache-rocketmq目录,再将apache-rocketmq.tar.gz解压到此目录下

[root@rich apache-rocketmq]#  mkdir /usr/local/apache-rocketmq && tar -zxvf apache-rocketmq.tar.gz -C /usr/local/apache-rocketmq

再建立一个软连接

[root@rich local]# ln -s apache-rocketmq rocketmq

创建存储路径

[root@rich local]# mkdir /usr/local/rocketmq/store
[root@rich local]# mkdir /usr/local/rocketmq/store/commitlog
[root@rich local]# mkdir /usr/local/rocketmq/store/consumequeue
[root@rich local]# mkdir /usr/local/rocketmq/store/index

更改RocketMQ配置文件

vim /usr/local/rocketmq/conf/2m-2s-async/broker-a.properties

broker-a.properties(主启动文件)更改成如下:

#所属集群名字
brokerClusterName=rocketmq-cluster
#broker 名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master, >0 表示 Slave
brokerId=0
#nameServer 地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876
#在发送消息时,自动创建服务器不存在的 topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4 点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog 每个文件的大小默认 1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制 Master
#- SYNC_MASTER 同步双写 Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

修改日志配置文件

我们去到 usr/local目录下

[root@rocketmq-nameserver1 local]# mkdir -p /usr/local/rocketmq/logs
[root@rocketmq-nameserver1 local]# cd /usr/local/rocketmq/conf && sed -i 's#${user.home}#/usr/local/rocketmq#g' *.xml

修改启动脚本的参数

[root@rocketmq-nameserver1 bin]# vim /usr/local/rocketmq/bin/runbroker.sh
[root@rocketmq-nameserver1 bin]# vim /usr/local/rocketmq/bin/runserver.sh

将两个文件里面的内存配置都改为1G,至少1G,不然启动会失败。

启动RocketMQ

启动顺序是先启动NameServer,再启动BrokerServer

首先我们进入/usr/local/rocketmq/bin目录下输入下面命令启动NameServer

[root@rocketmq-nameserver1 bin]# nohup sh mqnamesrv >/dev/null 2>&1 &

 然后输入下面命令启动BrokerServer

[root@rocketmq-nameserver1 bin]# nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-async/broker-a.properties >/dev/null 2>&1 &

再输入jps,可以看到BrokerServer启动了

关闭RocketMQ

如果我们要关闭服务的话先关BrokerServer,再关闭NamesrvStartup,进入/usr/local/rocketmq/bin目录下输入下面命令:

# sh mqshutdown broker
# sh mqshutdown namesrv

二.单主从模式集群环境构建

我们现在要新增一个Slave节点,我用的是阿里云虚拟机,然后我们先去本地虚拟机把阿里云的节点添加到etc/hosts文件中(在集群中最好显示声明brokerIP1这一个参数,不只是在阿里云服务器,本地虚拟机也需要这么做,不然无法实现从服务的nameserver1读取broker信息)

#0 表示 Master, >0 表示 Slave
brokerId=1
#nameServer 地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
brokerIP1=192.168.113.115
#- ASYNC_MASTER 异步复制 Master
#- SYNC_MASTER 同步双写 Master
#- SLAVE
brokerRole=SLAVE

本地虚拟机的broker-a.properties(主启动文件)文件配置需要将更改如下:

#nameServer 地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876

最后用broker-a.properties启动本地虚拟机的节点,使用broker-a-s.properties启动阿里云虚拟机的节点,然后通过控制台来看结果:

说明成功啦!

问题:如果rocketmq-console没有显示并且报错,有如下方案:

(1)进入页面报错提示: org.apache.rocketmq.remoting.exception.RemotingConnectException: connection to ip : 10911 failed,

原因如下出现这个错误是由于没有配置外网地址,,特别是使用云服务器的话,必须要配置外网地址,而且要开放端口 9876 ,10911,10909 这三个端口。

验证问题:使用telnet ip:port 来测试该主机的port是否能够正常访问。

解决方案:

  1. 让防火墙为该端口放行。执行firewall-cmd --permanent --zone=public --add-port=3306(port)/tcp
  2. 关闭防火墙。执行 systemctl stop firewalld

三.双主双从集群环境搭建

现在我们来讨论双主双从,两台机器分别部署Broker-Master&NameServer,两台机器分别部署Broker-Slave&NameServe

上图图中,把NameServer单独提出来了,变成一个共有的服务组,但有时候为了节省服务器,我们可以把NameServer与BrokerMater和BrokerSlave部署到同一个节点就。比如两台机器部署master,两台机器部署slave,然后每台机器上都有一个NameServer,这也是我们的部署方案。

我们首先需要四个节点:master1 、master2、slave1、slave2.

master1与slave1是一对主从节点,master2与slave2是一对主从节点,每个节点有一个NameServer。

多主多从模式分为两种方式,第一种为异步复制,第二种为同步双写.

双主模式,文件夹配置为: conf/2m-noslave/

多主多从模式(异步复制),文件夹配置为: conf/2m-2s-async/

多主多从模式(同步双写),文件夹配置为:conf/2m-2s-sync/

我们为了可靠性,选择双写双从,如果想提高性能,可以在后面的刷盘策略上下功夫。关于双写双从的配置我们后面讲。

四个节点的配置和上面一样,只有下面两个操作略有不同

第一个是etc/hosts文件配置。如下

127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
::1         localhost localhost.localdomain localhost6 localhost6.localdomain6


192.168.133.115 rocketmq-nameserver1
192.168.133.115 rocketmq-master1

192.168.133.116 rocketmq-nameserver2
192.168.133.116 rocketmq-master1-slave


192.168.133.117 rocketmq-nameserver3
192.168.133.117 rocketmq-master2

192.168.133.118 rocketmq-nameserver4
192.168.133.118 rocketmq-master2-slave

接下来我们再改同步双写的配置文件

broker-a.properties、 broker-b.properties 配置如下:

#所属集群名字
brokerClusterName=rocketmq-cluster
#broker 名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a|broker-b
#0 表示 Master, >0 表示 Slave
brokerId=0
#nameServer 地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876;rocketmq-nameserver3:9876;rocketmq-nameserver4:9876
#在发送消息时,自动创建服务器不存在的 topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4 点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog 每个文件的大小默认 1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制 Master
#- SYNC_MASTER 同步双写 Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

broker-a-s.properties、 broker-b-s.properties 配置如下:

#所属集群名字
brokerClusterName=rocketmq-cluster
#broker 名字,注意此处不同的配置文件填写的不一样,与 Master 通过 brokerName 来配对
brokerName=broker-a|broker-b
#0 表示 Master, >0 表示 Slave
brokerId=1
#nameServer 地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876;rocketmq-nameserver3:9876;rocketmq-nameserver4:9876
#在发送消息时,自动创建服务器不存在的 topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4 点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog 每个文件的大小默认 1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制 Master
#- SYNC_MASTER 同步双写 Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

四、测试代码

生产者——同步发送消息

public class SyncProducer {
 
    public static void main(String[] args) throws Exception {
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new
                DefaultMQProducer("please_rename_unique_group_name");
        // Specify name server addresses.
        producer.setNamesrvAddr("192.168.133.115:9876");
        //Launch the instance.
        producer.start();
        for (int i = 0; i < 100; i++) {
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " +
                            i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            //Call send message to deliver message to one of brokers.
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        //Shut down once the producer instance is not longer in use.
        producer.shutdown();
    }
}

消费消息

public class Consumer {
 
    public static void main(String[] args) throws InterruptedException, MQClientException {
 
        // Instantiate with specified consumer group name.
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
 
        // Specify name server addresses.
        consumer.setNamesrvAddr("192.168.133.116:9876");
 
        // Subscribe one more more topics to consume.
        consumer.subscribe("TopicTest", "*");
        // Register callback to execute on arrival of messages fetched from brokers.
        consumer.registerMessageListener(new MessageListenerConcurrently() {
 
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                byte[] bt = msgs.get(0).getBody();
                try {
                    System.out.println("-------------" + new String(bt,"UTF-8")+ "------------");
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
 
        //Launch the consumer instance.
        consumer.start();
 
        System.out.printf("Consumer Started.%n");
    }
}

单机与一主一从、多主多从的代码基本上一致,只是消费者所指定的 namesvrAddr 不一定是生产的那台服务所在的 namesvrAddr ,但是经过测试,生产者在 192.168.133.115 机器生产消息,消费者指定的是 192.168.133.116 机器的namesvrAddr ,同样消费成功了。

这一步验证了前面在讲解原理时说过的,消费者主动从 namesvrAddr 拉取broker消息,根据指定的主题“TopicTest” 去对应的 broker 上拉取消息进行消费。这里是主从节点,所以直接在 192.168.133.116 机器上已经同步完成了数据,所以消费者可以直接在该机器上进行消费消息。

以上是关于部署:单主部署一主一从部署双主双从部署代码测试的主要内容,如果未能解决你的问题,请参考以下文章

Mycat中间件实现一主一从和双主双从的读写分离

安装部署RocketMQ集群(双主双从)

Mysql双主双从同步配置

MySQL主从复制搭建之一主一从双主双从搭建

rocketmq 双主双从同步写安装部署

17 Rocketmq集群双主双从同步搭建