搭建 Apache RocketMQ 单机环境
Posted 鮀城小帅
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了搭建 Apache RocketMQ 单机环境相关的知识,希望对你有一定的参考价值。
环境需求:
64位操作系统,建议使用Linux / Unix /
-
CentOs7.3
-
64bit JDK 1.8+
-
Maven 3.2.x
一、安装Maven
参考链接:
二、安装RocketMQ
1、关闭防火墙
systemctl stop firewalld.service
2、下载和构建
wget http://mirrors.hust.edu.cn/apache/rocketmq/4.2.0/rocketmq-all-4.2.0-source-release.zip
unzip rocketmq-all-4.2.0-source-release.zip
cd rocketmq-all-4.2.0
mvn -Prelease-all -DskipTests clean install -U
mv distribution/target/apache-rocketmq /usr/local/apache-rocketmq
cd /usr/local/apache-rocketmq/
编译成功的响应
3、配置rocketmq的环境变量,在/etc/profile最后添加
export ROCKETMQ_HOME=/usr/local/apache-rocketmq
export PATH=$JAVA_HOME/bin:$ROCKETMQ_HOME/bin:$PATH
4、使rocketmq的环境变量生效
source /etc/profile
三、内存问题:
注意:
启动NameServer 和Broker的时候可能会出现错误,请留意对应的日志文件。在测试环境中常见的错误是内存不足的错误,这时候可以修改NameSever和Broker的启动脚本。Xms\\Mmx不小于1g。
另外:
#mqbroker.xml和mqnamesrv.xml的内存不要超过runbroker.sh 和runserver.sh的内存,不然会引起内存不够导致奔溃。
bin/mqnamesrv.xml
bin/mqbroker.xml
四、启动Name Server
1、修改runserver,默认 RocketMQ Server 内存需要很大的
vim bin/runserver.sh
--------------------------------------------------------------
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
2、启动 Name Server
#nohup来启动
nohup sh bin/mqnamesrv >/dev/null 2>&1 &
#查看日志
tail -f ~/logs/rocketmqlogs/namesrv.log
启动成功信息如下
The Name Server boot success. serializeType=JSON
五、启动broker
1、修改runbroker,默认 RocketMQ Broker 内存需要很大的。
vim bin/runbroker.sh
--------------------------------------------------------------
JAVA_OPT="${JAVA_OPT} -server -Xms11g -Xmx1g -Xmn512m"
2、启动Broker
#nohup来启动
nohup sh bin/mqbroker -n localhost:9876 >/dev/null 2>&1 &
#查看日志
tail -f ~/logs/rocketmqlogs/broker.log
启动成功信息如下
2021-07-07 15:26:06 INFO main - Set user specified name server address: 192.168.133.116:9876
2021-07-07 15:26:06 INFO PullRequestHoldService - PullRequestHoldService service started
2021-07-07 15:26:06 INFO main - register broker to name server 192.168.133.116:9876 OK
2021-07-07 15:26:06 INFO main - The broker[env1, 192.168.133.116:10911] boot success. serializeType=JSON and name server is 192.168.133.116:9876
六、查看进程
[root@rich apache-rocketmq]# jps
3441 BrokerStartup
3606 Jps
3383 NamesrvStartup
[root@rich apache-rocketmq]# netstat -ntlp
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name
tcp 0 0 0.0.0.0:111 0.0.0.0:* LISTEN 1/systemd
tcp 0 0 192.168.122.1:53 0.0.0.0:* LISTEN 1270/dnsmasq
tcp 0 0 0.0.0.0:22 0.0.0.0:* LISTEN 991/sshd
tcp 0 0 127.0.0.1:631 0.0.0.0:* LISTEN 993/cupsd
tcp 0 0 0.0.0.0:3306 0.0.0.0:* LISTEN 1126/mysqld
tcp6 0 0 :::111 :::* LISTEN 1/systemd
tcp6 0 0 :::9876 :::* LISTEN 3383/java
tcp6 0 0 :::22 :::* LISTEN 991/sshd
tcp6 0 0 ::1:631 :::* LISTEN 993/cupsd
tcp6 0 0 :::10909 :::* LISTEN 3441/java
tcp6 0 0 :::10911 :::* LISTEN 3441/java
tcp6 0 0 :::10912 :::* LISTEN 3441/java
七、发送和收取消息
在发送和收取消息之前,我们需要告诉客户端Name Server的位置。RocketMQ有多种办法来实现,在这里我们使用最简单的环境变量 NAMESRV_ADDR
。
export NAMESRV_ADDR=localhost:9876
#生产者
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
#消费者
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
八、关闭服务
#关闭nameserver
[root@rich bin]# ./mqshutdown namesrv
#关闭broker
[root@rich bin]# ./mqshutdown broker
九、生产者与消费者代码
(1)同步发送消息
可靠的同步传输应用于广泛的场景,如重要通知消息、短信通知、短信营销系统等。
public class SyncProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
DefaultMQProducer producer = new
DefaultMQProducer("please_rename_unique_group_name");
// Specify name server addresses.
producer.setNamesrvAddr("192.168.133.117:9876");
//Launch the instance.
producer.start();
for (int i = 0; i < 100; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
}
(2)异步发送消息
异步传输一般用于响应时间敏感的业务场景
public class AsyncProducer {
public static void main(String[] args) throws Exception {
//使用生产者组名称进行实例化。
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 指定服务器地址。
producer.setNamesrvAddr("192.168.133.117:9876");
//启动实例。
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
int messageCount = 100;
final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
for (int i = 0; i < messageCount; i++) {
try {
final int index = i;
Message msg = new Message("Jodie_topic_1023",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
countDownLatch.countDown();
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
countDownLatch.countDown();
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
countDownLatch.await(5, TimeUnit.SECONDS);
producer.shutdown();
}
}
(3)单向发送消息
单向传输用于需要中等可靠性的情况,例如日志收集。
public class OnewayProducer {
public static void main(String[] args) throws Exception{
//Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// Specify name server addresses.
producer.setNamesrvAddr("192.168.133.117:9876");
//Launch the instance.
producer.start();
for (int i = 0; i < 100; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("wu ----- Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
producer.sendOneway(msg);
}
//Wait for sending to complete
Thread.sleep(5000);
producer.shutdown();
}
}
(4)消费消息
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// Instantiate with specified consumer group name.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
// Specify name server addresses.
consumer.setNamesrvAddr("192.168.133.117:9876");
// Subscribe one more more topics to consume.
consumer.subscribe("TopicTest", "*");
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
byte[] bt = msgs.get(0).getBody();
try {
System.out.println("-------------" + new String(bt,"UTF-8")+ "------------");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//Launch the consumer instance.
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
以上代码来自rocketmq官网。
以上是关于搭建 Apache RocketMQ 单机环境的主要内容,如果未能解决你的问题,请参考以下文章