RocketMQ 4.5.1 单机环境搭建以及生产消费测试

Posted lua123

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ 4.5.1 单机环境搭建以及生产消费测试相关的知识,希望对你有一定的参考价值。

为了学习和方便测试,总是要启动一个单机版的。下载 http://rocketmq.apache.org/dowloading/releases/

1. 要先配置环境变量

ROCKETMQ_HOME

E:\\rocketmq-all-4.5.1-bin-release

技术图片

2. 然后进入bin目录启动NameServer

技术图片

3. 启动Broker

启动

E:\\rocketmq-all-4.5.1-bin-release\\bin>mqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true

可能会出现一个错误: 找不到或无法加载主类 Files\\Java\\jdk1.8.0_161\\lib;C:\\Program

 

解决方法:(打开bin目录的runserver.cmd

技术图片

修改成

技术图片

重新启动,成功

技术图片

4. 弄个管控台方便查看

https://github.com/apache/rocketmq-externals

下载好进入 rocketmq-console 目录打包

mvn clean package -Dmaven.test.skip=true

技术图片

技术图片

进入target目录,启动 (最后的参数的nameserver的地址,也就是我本机地址)

E:\\rocketmq-externals-master\\rocketmq-console\\target>java -jar rocketmq-console-ng-1.0.1.jar --rocketmq.config.namesrvAddr=localhost:9876

技术图片

最后访问 http://localhost:8080 即可

技术图片

5. 简单测试

引入依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.5.1</version>
</dependency>

一个简单的生产者

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.io.UnsupportedEncodingException;

public class Test 

    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException 
        // 设置生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("my_producer_group");
        // 设置NameServer地址
        producer.setNamesrvAddr("10.204.241.15:9876");
        // 启动
        producer.start();
        for (int i = 0; i < 10; i++) 
            // 创建一条消息,包含topic、tag以及消息内容
            Message msg = new Message("MyTopic", "MyTag", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            // 发送结果
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        
        // 不用的时候关闭
        producer.shutdown();
    

查看管控台

技术图片

查看详细

技术图片

下面一个简单的消费者

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.io.UnsupportedEncodingException;
import java.util.List;

public class Test2 

    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException 
        // 设置生产者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_producer_group");
        // 设置NameServer地址
        consumer.setNamesrvAddr("localhost:9876");
        // 订阅的主题
        consumer.subscribe("MyTopic", "*");
        // 注册消息监听
        consumer.registerMessageListener(new MessageListenerConcurrently() 
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) 
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            
        );
        // 启动
        consumer.start();
        System.out.printf("Consumer Started.%n");
    

控制台输出

技术图片

不要关闭消费者,查看管控台

技术图片

 

以上是关于RocketMQ 4.5.1 单机环境搭建以及生产消费测试的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ单机环境搭建

搭建 Apache RocketMQ 单机环境

搭建 Apache RocketMQ 单机环境

15 Centos单机搭建Rocketmq运行环境

RocketMQ 4.5.1 双主双从异步复制环境搭建

rocketMQ基础环境