保姆级Windows下安装RocketMQ(附简单小Demo)
Posted 默辨
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了保姆级Windows下安装RocketMQ(附简单小Demo)相关的知识,希望对你有一定的参考价值。
Windows下安装RocketMQ
1、下载RocketMQ
下载地址:https://github.com/apache/rocketmq
可选择自己想要的版本进行加载
最终你会得到这样一个文件内容
2、启动服务
1、配置MQ的环境变量
- ROCKETMQ_HOME=安装路径\\rocketmq-all-4.9.0-bin-release
- PATH=%ROCKETMQ_HOME%\\bin
2、启动nameser
- 找到文件:安装路径\\rocketmq-all-4.9.0-bin-release\\bin\\mqnamesrv.cmd
- 输入命令:start mqnamesrv.cmd
3、配置broker
- 找到文件:安装路径\\rocketmq-all-4.9.0-bin-release\\bin\\mqbroker.cmd
- 输入命令:start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
// 指定broker配置文件的形式启动
start mqbroker.cmd -n localhost:9876 -c ../conf/broker.conf
3、测试RocketMQ自带功能
该功能为RocketMQ自带的消息测试类,可用来测试对应的消息生产和消费是否正常
1、消费端消息
提示Consumer Started表示开始开始接收消息,用于消费
set NAMESRV_ADDR=localhost:9876
tools.cmd org.apache.rocketmq.example.quickstart.Consumer
2、生产端消息
当生产消息的一端发送消息后,对应的消费端会开始消息消息,具体详情如下图
set NAMESRV_ADDR=localhost:9876
tools.cmd org.apache.rocketmq.example.quickstart.Producer
4、RocketMQ控制台
最终效果如下图,该客户端可以可视化的查看对应的消息数据
1、下载控制台项目
https://github.com/apache/rocketmq-externals
2、修改项目端口信息
这是一个使用Java编写的SpringBoot项目。
需要修改 安装目录\\rocketmq-externals-rocketmq-console-1.0.0\\rocketmq-console\\src\\test\\resources下的application.properties文件,完成和nameServer进行绑定
# 配置RocketMQ-console项目启动端口(端口自定义,避免和其他端口冲突)
server.port=5555
# 绑定对应url的nameServer
rocketmq.config.namesrvAddr=127.0.0.1:9876
3、编译项目
直接使用maven的mvn命令编译我们的SpringBoot项目,直到出现BUILD SUCCESS即为编译成功
mvn clean package -Dmaven.test.skip=true
最后在rocketmq-console目录下会多一个traget项目,并且内部含有打包好的jar包。
4、使用Java命令启动项目
java -jar rocketmq-console-ng-1.0.0.jar
5、访问前面配置的5555端口
http://localhost:5555/#/
5、编写对应的Java测试Demo
前期的Java项目创建部分直接跳过
1、引入pom依赖
需要与RocketMQ版本相同
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.0</version>
</dependency>
2、编写消息消费端
public class ConsumerTest
public static void main(String[] args) throws MQClientException
// 定义一个pull消费者
// DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("cg");
// 定义一个push消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("MobianProducer");
// 指定nameServer
consumer.setNamesrvAddr("127.0.0.1:9876");
// 指定从第一条消息开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 指定消费topic与tag
consumer.subscribe("TopicTest", "Tag");
// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently()
// 一旦broker中有了其订阅的消息就会触发该方法的执行,
// 其返回值为当前consumer消费的状态
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context)
// 逐条消费消息
for (MessageExt msg : msgs)
System.out.println(msg);
// 返回消费状态:消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
);
// 开启消费者消费
consumer.start();
System.out.println("Consumer Started");
3、编写消息生产端
public class ProducerTest
public static void main(String[] args) throws Exception
// 创建一个producer,参数为Producer Group名称
DefaultMQProducer producer = new DefaultMQProducer("MobianProducer");
// 指定nameServer地址
producer.setNamesrvAddr("127.0.0.1:9876");
// 设置当发送失败时重试发送的次数,默认为2次
producer.setRetryTimesWhenSendFailed(3);
// 设置发送超时时限为5s,默认3s
producer.setSendMsgTimeout(5000);
// 开启生产者
producer.start();
// 生产并发送100条消息
for (int i = 0; i < 100; i++)
byte[] body = ("Hi," + i).getBytes();
Message msg = new Message("TopicTest", "Tag", body);
// 为消息指定key
msg.setKeys("key-" + i);
// 同步发送消息
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
// 关闭producer
producer.shutdown();
4、测试效果如下图
测试Demo数据发送了100条。我们可以看到消息在1000条(第3步自带的测试类中的消息)的基础上增加了100条
Java项目测试结果如下:
MQ-console控制台效果如下:
对应的消息数据:
以上是关于保姆级Windows下安装RocketMQ(附简单小Demo)的主要内容,如果未能解决你的问题,请参考以下文章
Ubuntu18.04安装Anacondacudacudnnpytorch pycharm(附:保姆级图文流程)
Jenkins教程之windows下安装Jenkins保姆级教程
超简单,保姆级❤️Linux 安装 Windows 软件,微信QQTIM等,再也不用来回切换了!❤️建议收藏