Rocketmq 案例实战
Posted Dream_it_possible!
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Rocketmq 案例实战相关的知识,希望对你有一定的参考价值。
目录
客户端版本: 4.3.0
服务端版本: 4.3.0
一、批量发送消息
使用DefaultMQProducer批量发送消息
public class SimpleBatchProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
//If you just send messages of no more than 1MiB at a time, it is easy to use batch
//Messages of the same batch should have: same topic, same waitStoreMsgOK and no schedule support
String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes()));
producer.send(messages);
}
}
启动报错:No route info of this topic: BatchTest
此问题是因为没有topic导致报的错!检查broker情况:
发现启动了! 经查需要把server和客户端的版本给对应上,客户单的版本是release-4.8.0, server的版本是release -4.3.0。
使用git tag 命令查看所有版本:
使用命令切换到4.3.0版本:
get checkout rocketmq-all-4.3.0
切换成功后,依赖的版本也会变成4.3.0:
重新下载对应地版本插件后,即可解决 No route info of this topic: BatchTest 的问题!
查看consule:
消息已经批量发送到rocketmq!
二、批量消费消息
使用DefaultMQPushConsumer 来批量消费消息,因为消费者在rocketmq里也是有组的概念,可以待会根据打印结果分析。
订阅topic和监听器,订阅和消费消息
package org.apache.rocketmq.example.batch;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* @author bingbing
* @date 2021/6/13 0013 22:33
*/
public class SimpleBatchConsumer {
public static void main(String[] args) {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumergroupone");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
try {
consumer.subscribe("BatchTest", "*");
consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {
System.out.println(Thread.currentThread().getName() + ":" + msg);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
}
可以发现在消费过程中,使用了多线程技术,也有线程重用的情况,消费完毕后,可以从consule那里查看已经被消费的消息状态,trackType类型为consumed。
以上是关于Rocketmq 案例实战的主要内容,如果未能解决你的问题,请参考以下文章
Express实战 - 应用案例- realworld-API - 路由设计 - mongoose - 数据验证 - 密码加密 - 登录接口 - 身份认证 - token - 增删改查API(代码片段