Rocketmq 案例实战

Posted Dream_it_possible!

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Rocketmq 案例实战相关的知识,希望对你有一定的参考价值。

 

目录

一、批量发送消息

二、批量消费消息


客户端版本:  4.3.0

服务端版本: 4.3.0

一、批量发送消息

 使用DefaultMQProducer批量发送消息

public class SimpleBatchProducer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();

        //If you just send messages of no more than 1MiB at a time, it is easy to use batch
        //Messages of the same batch should have: same topic, same waitStoreMsgOK and no schedule support
        String topic = "BatchTest";
        List<Message> messages = new ArrayList<>();
        messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes()));
        messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes()));
        messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes()));

        producer.send(messages);
    }
}

启动报错:No route info of this topic: BatchTest 

此问题是因为没有topic导致报的错!检查broker情况:

发现启动了! 经查需要把server和客户端的版本给对应上,客户单的版本是release-4.8.0, server的版本是release -4.3.0。

使用git tag 命令查看所有版本:

使用命令切换到4.3.0版本:

get checkout rocketmq-all-4.3.0

切换成功后,依赖的版本也会变成4.3.0:

  

重新下载对应地版本插件后,即可解决  No route info of this topic: BatchTest 的问题!

查看consule:

 消息已经批量发送到rocketmq!

 

二、批量消费消息

         使用DefaultMQPushConsumer 来批量消费消息,因为消费者在rocketmq里也是有组的概念,可以待会根据打印结果分析。

          订阅topic和监听器,订阅和消费消息

package org.apache.rocketmq.example.batch;

import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * @author bingbing
 * @date 2021/6/13 0013 22:33
 */
public class SimpleBatchConsumer {

    public static void main(String[] args) {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumergroupone");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        try {
            consumer.subscribe("BatchTest", "*");
            consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {
                System.out.println(Thread.currentThread().getName() + ":" + msg);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            });

            consumer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }

    }

}

 可以发现在消费过程中,使用了多线程技术,也有线程重用的情况,消费完毕后,可以从consule那里查看已经被消费的消息状态,trackType类型为consumed。

 

 

以上是关于Rocketmq 案例实战的主要内容,如果未能解决你的问题,请参考以下文章

Express实战 - 应用案例- realworld-API - 路由设计 - mongoose - 数据验证 - 密码加密 - 登录接口 - 身份认证 - token - 增删改查API(代码片段

RocketMQ入门到入土事务消息&顺序消息

深入理解RocketMQ---实战(控制台搭建)

深入理解RocketMQ---实战(控制台搭建)

千亿级微服务中的RocketMQ实践(130多页的案例PPT代码)

Spring boot实战项目整合阿里云RocketMQ 消息队列实现发送普通消息,延时消息