RocketMQ 4.5.1 单机环境搭建以及生产消费测试
Posted lua123
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ 4.5.1 单机环境搭建以及生产消费测试相关的知识,希望对你有一定的参考价值。
为了学习和方便测试,总是要启动一个单机版的。下载 http://rocketmq.apache.org/dowloading/releases/
1. 要先配置环境变量
ROCKETMQ_HOME
E:\\rocketmq-all-4.5.1-bin-release
2. 然后进入bin目录启动NameServer
3. 启动Broker
启动
E:\\rocketmq-all-4.5.1-bin-release\\bin>mqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true
可能会出现一个错误: 找不到或无法加载主类 Files\\Java\\jdk1.8.0_161\\lib;C:\\Program
解决方法:(打开bin目录的runserver.cmd)
修改成
重新启动,成功
4. 弄个管控台方便查看
https://github.com/apache/rocketmq-externals
下载好进入 rocketmq-console 目录打包
mvn clean package -Dmaven.test.skip=true
进入target目录,启动 (最后的参数的nameserver的地址,也就是我本机地址)
E:\\rocketmq-externals-master\\rocketmq-console\\target>java -jar rocketmq-console-ng-1.0.1.jar --rocketmq.config.namesrvAddr=localhost:9876
最后访问 http://localhost:8080 即可
5. 简单测试
引入依赖
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.5.1</version> </dependency>
一个简单的生产者
import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; import java.io.UnsupportedEncodingException; public class Test public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException // 设置生产者组名 DefaultMQProducer producer = new DefaultMQProducer("my_producer_group"); // 设置NameServer地址 producer.setNamesrvAddr("10.204.241.15:9876"); // 启动 producer.start(); for (int i = 0; i < 10; i++) // 创建一条消息,包含topic、tag以及消息内容 Message msg = new Message("MyTopic", "MyTag", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); // 发送结果 SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); // 不用的时候关闭 producer.shutdown();
查看管控台
查看详细
下面一个简单的消费者
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; import java.io.UnsupportedEncodingException; import java.util.List; public class Test2 public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException // 设置生产者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_producer_group"); // 设置NameServer地址 consumer.setNamesrvAddr("localhost:9876"); // 订阅的主题 consumer.subscribe("MyTopic", "*"); // 注册消息监听 consumer.registerMessageListener(new MessageListenerConcurrently() public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; ); // 启动 consumer.start(); System.out.printf("Consumer Started.%n");
控制台输出
不要关闭消费者,查看管控台
以上是关于RocketMQ 4.5.1 单机环境搭建以及生产消费测试的主要内容,如果未能解决你的问题,请参考以下文章