自建RocketMQ中间件开发
Posted lisin-lee-cooper
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了自建RocketMQ中间件开发相关的知识,希望对你有一定的参考价值。
一.RocketMQ简介
1.简介
RocketMQ是一个纯Java、分布式、队列模型的开源消息中间件,前身是MetaQ,是阿里参考Kafka特点研发的一个队列模型的消息中间件,后开源给apache基金会成为了apache的顶级开源项目,具有高性能、高可靠、高实时、分布式特点。
更多内容可以查看官方文档 RocketMQ官方文档
2.订阅关系
2.1.每个GroupId 对应一个微服务,一个微服务可以有多个节点,但是节点间的订阅关系必须一致,否则接收消息会紊乱
2.2一般一个服务发送一种消息,可以用不同的Tag来确定某种对应的业务模式
3.服务部署图
NameServer
用来保存活跃的 broker 列表,包括 Master 和 Slave ;用来保存所有 topic 和该 topic 所有队列的列表;用来保存所有 broker 的 Filter 列表。
4.原理及意义
1.为了避免过多的请求瞬间打到服务器,给数据库压力,导致崩溃
2.每个微服务无需关心消息底层发送逻辑,直接调用集成插件的api即可实现消息发送
二.代码框架
1.工程结构
框架详细源码 RocketMq中间件源码
三.测试验证
1.消息发送者
1.1 配置系统文件,启动服务时读取
#provicer.properties
mq.config.rmq.env=DEV
mq.config.rmq.groupId=testGroupId666
mq.config.rmq.nameSrv=127.0.0.1:9876
public class ProviderApplication {
public static void main(String[] args) {
try {
System.getProperties().load(new FileInputStream("D:/app/config/provider.properties"));
} catch (IOException e) {
e.printStackTrace();
}
SpringApplication.run(ProviderApplication.class, args);
}
}
1.2 单向消息发送
@GetMapping(value = "/test/pulisher/oneWay")
public String sendMsg(@RequestParam("msg") String msg, @RequestParam("tag") String tag, @RequestParam("topic") String topic) {
new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
Thread.sleep(1000);
rmqPublisher.sendOneWayMessage(topic, tag, msg + i);
}
} catch (Exception e) {
log.info("e", e);
e.printStackTrace();
}
}).start();
return "success";
}
2 消息接受者
2.1配置系统文件,启动时读取
## consumer.properties
mq.config.rmq.env=DEV
mq.config.rmq.groupId=testGroupId888
mq.config.rmq.nameSrv=127.0.0.1:9876
2.3消息接收监听器
@Slf4j
@Component
public class ConsumerListener extends AbstractRmqNormalListener {
@Override
protected ConsumeConcurrentlyStatus doConsume(MessageExt messageExt) {
try {
log.info("message received on topic {} , msgTag {},content: {}, msgKey {}, msgID {}", messageExt.getTopic(), messageExt.getTags(), new String(messageExt.getBody(), StandardCharsets.UTF_8),
messageExt.getKeys(), messageExt.getMsgId());
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
log.error("ConsumerListener error : {}, message : {}", e, JSON.toJSONString(messageExt));
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
@Override
public List<MQTopicInfo> getTopicList() {
MQTopicInfo mqTopicInfo = new MQTopicInfo();
mqTopicInfo.setName("topic1");
mqTopicInfo.setTags(new HashSet<>(Arrays.asList("*")));
return Arrays.asList(mqTopicInfo);
}
}
3.验证结果
3.1 发送单向消息
模拟发送消息
http://127.0.0.1:8080/test/pulisher/oneWay?topic=topic1&tag=tag2&msg=单向消息测试
2021-07-03 16:37:32.534 INFO 18848 --- [ Thread-23] com.example.mqpublisher.RMQPublisher : oneWayMessage sent to topic TOPIC-DEV-topic1, tag tag2, body "单向消息测试0" success
2021-07-03 16:37:33.535 INFO 18848 --- [ Thread-23] com.example.mqpublisher.RMQPublisher : oneWayMessage sent to topic TOPIC-DEV-topic1, tag tag2, body "单向消息测试1" success
2021-07-03 16:37:34.535 INFO 18848 --- [ Thread-23] com.example.mqpublisher.RMQPublisher : oneWayMessage sent to topic TOPIC-DEV-topic1, tag tag2, body "单向消息测试2" success
2021-07-03 16:37:35.536 INFO 18848 --- [ Thread-23] com.example.mqpublisher.RMQPublisher : oneWayMessage sent to topic TOPIC-DEV-topic1, tag tag2, body "单向消息测试3" success
2021-07-03 16:37:36.536 INFO 18848 --- [ Thread-23] com.example.mqpublisher.RMQPublisher : oneWayMessage sent to topic TOPIC-DEV-topic1, tag tag2, body "单向消息测试4" success
2021-07-03 16:37:37.537 INFO 18848 --- [ Thread-23] com.example.mqpublisher.RMQPublisher : oneWayMessage sent to topic TOPIC-DEV-topic1, tag tag2, body "单向消息测试5" success
2021-07-03 16:37:38.538 INFO 18848 --- [ Thread-23] com.example.mqpublisher.RMQPublisher : oneWayMessage sent to topic TOPIC-DEV-topic1, tag tag2, body "单向消息测试6" success
2021-07-03 16:37:39.538 INFO 18848 --- [ Thread-23] com.example.mqpublisher.RMQPublisher : oneWayMessage sent to topic TOPIC-DEV-topic1, tag tag2, body "单向消息测试7" success
2021-07-03 16:37:40.538 INFO 18848 --- [ Thread-23] com.example.mqpublisher.RMQPublisher : oneWayMessage sent to topic TOPIC-DEV-topic1, tag tag2, body "单向消息测试8" success
2021-07-03 16:37:41.539 INFO 18848 --- [ Thread-23] com.example.mqpublisher.RMQPublisher : oneWayMessage sent to topic TOPIC-DEV-topic1, tag tag2, body "单向消息测试9" success
接收消息
2021-07-03 16:37:32.936 INFO 18328 --- [MessageThread_1] c.e.consumer.listener.ConsumerListener : message received on topic topic1 , msgTag tag2,content: "单向消息测试0", msgKey tag2, msgID 00000000000000000000000000000001000018B4AAC20DDDFEF60014
2021-07-03 16:37:33.540 INFO 18328 --- [MessageThread_2] c.e.consumer.listener.ConsumerListener : message received on topic topic1 , msgTag tag2,content: "单向消息测试1", msgKey tag2, msgID 00000000000000000000000000000001000018B4AAC20DDE02DF0016
2021-07-03 16:37:34.539 INFO 18328 --- [MessageThread_3] c.e.consumer.listener.ConsumerListener : message received on topic topic1 , msgTag tag2,content: "单向消息测试2", msgKey tag2, msgID 00000000000000000000000000000001000018B4AAC20DDE06C70018
2021-07-03 16:37:35.542 INFO 18328 --- [MessageThread_4] c.e.consumer.listener.ConsumerListener : message received on topic topic1 , msgTag tag2,content: "单向消息测试3", msgKey tag2, msgID 00000000000000000000000000000001000018B4AAC20DDE0AB0001A
2021-07-03 16:37:36.543 INFO 18328 --- [MessageThread_5] c.e.consumer.listener.ConsumerListener : message received on topic topic1 , msgTag tag2,content: "单向消息测试4", msgKey tag2, msgID 00000000000000000000000000000001000018B4AAC20DDE0E98001C
2021-07-03 16:37:37.927 INFO 11484 --- [MessageThread_1] c.e.consumer.listener.ConsumerListener : message received on topic topic1 , msgTag tag2,content: "单向消息测试5", msgKey tag2, msgID 00000000000000000000000000000001000018B4AAC20DDE1281001E
2021-07-03 16:37:38.544 INFO 11484 --- [MessageThread_2] c.e.consumer.listener.ConsumerListener : message received on topic topic1 , msgTag tag2,content: "单向消息测试6", msgKey tag2, msgID 00000000000000000000000000000001000018B4AAC20DDE166A0020
2021-07-03 16:37:39.541 INFO 11484 --- [MessageThread_3] c.e.consumer.listener.ConsumerListener : message received on topic topic1 , msgTag tag2,content: "单向消息测试7", msgKey tag2, msgID 00000000000000000000000000000001000018B4AAC20DDE1A520022
2021-07-03 16:37:40.542 INFO 11484 --- [MessageThread_4] c.e.consumer.listener.ConsumerListener : message received on topic topic1 , msgTag tag2,content: "单向消息测试8", msgKey tag2, msgID 00000000000000000000000000000001000018B4AAC20DDE1E3A0024
2021-07-03 16:37:41.544 INFO 11484 --- [MessageThread_5] c.e.consumer.listener.ConsumerListener : message received on topic topic1 , msgTag tag2,content: "单向消息测试9", msgKey tag2, msgID 00000000000000000000000000000001000018B4AAC20DDE22230026
3.2发送同步消息
模拟发送消息
http://127.0.0.1:8080/test/pulisher/sync?topic=topic1&tag=tag2&msg=同步消息测试
2021-07-03 17:07:13.989 INFO 19420 --- [ Thread-21] com.example.mqpublisher.RMQPublisher : message sent to topic TOPIC-DEV-topic1, tag tag2, body "同步消息测试0" success
2021-07-03 17:07:13.989 INFO 19420 --- [ Thread-21] c.e.p.c.TestPublisherController : topic:topic1,tag:tag2,sendResult:SendResult [sendStatus=SEND_OK, msgId=00000000000000000000000000000001000018B4AAC20DF92D020000, offsetMsgId=0A0100C300002A9F000000000000E326, messageQueue=MessageQueue [topic=TOPIC-DEV-topic1, brokerName=broker-a, queueId=9], queueOffset=11]
2021-07-03 17:07:14.992 INFO 19420 --- [ Thread-21] com.example.mqpublisher.RMQPublisher : message sent to topic TOPIC-DEV-topic1, tag tag2, body "同步消息测试1" success
2021-07-03 17:07:14.993 INFO 19420 --- [ Thread-21] c.e.p.c.TestPublisherController : topic:topic1,tag:tag2,sendResult:SendResult [sendStatus=SEND_OK, msgId=00000000000000000000000000000001000018B4AAC20DF931AF0002, offsetMsgId=0A0100C300002A9F000000000000E3FC, messageQueue=MessageQueue [topic=TOPIC-DEV-topic1, brokerName=broker-a, queueId=10], queueOffset=14]
2021-07-03 17:07:15.994 INFO 19420 --- [ Thread-21] com.example.mqpublisher.RMQPublisher : message sent to topic TOPIC-DEV-topic1, tag tag2, body "同步消息测试2" success
2021-07-03 17:07:15.995 INFO 19420 --- [ Thread-21] c.e.p.c.TestPublisherController : topic:topic1,tag:tag2,sendResult:SendResult [sendStatus=SEND_OK, msgId=00000000000000000000000000000001000018B4AAC20DF935990004, offsetMsgId=0A0100C300002A9F000000000000E4D2, messageQueue=MessageQueue [topic=TOPIC-DEV-topic1, brokerName=broker-a, queueId=11], queueOffset=16]
2021-07-03 17:07:16.996 INFO 19420 --- [ Thread-21] com.example.mqpublisher.RMQPublisher : message sent to topic TOPIC-DEV-topic1, tag tag2, body "同步消息测试3" success
2021-07-03 17:07:16.996 INFO 19420 --- [ Thread-21] c.e.p.c.TestPublisherController : topic:topic1,tag:tag2,sendResult:SendResult [sendStatus=SEND_OK, msgId=00000000000000000000000000000001000018B4AAC20DF939830006, offsetMsgId=0A0100C300002A9F000000000000E5A8, messageQueue=MessageQueue [topic=TOPIC-DEV-topic1, brokerName=broker-a, queueId=12], queueOffset=15]
2021-07-03 17:07:18.000 INFO 19420 --- [ Thread-21] com.example.mqpublisher.RMQPublisher : message sent to topic TOPIC-DEV-topic1, tag tag2, body "同步消息测试4" success
2021-07-03 17:07:18.000 INFO 19420 --- [ Thread-21] c.e.p.c.TestPublisherController : topic:topic1,tag:tag2,sendResult:SendResult [sendStatus=SEND_OK, msgId=00000000000000000000000000000001000018B4AAC20DF93D6E0008, offsetMsgId=0A0100C300002A9F000000000000E67E, messageQueue=MessageQueue [topic=TOPIC-DEV-topic1, brokerName=broker-a, queueId=13], queueOffset=15]
2021-07-03 17:07:19.003 INFO 19420 --- [ Thread-21] com.example.mqpublisher.RMQPublisher : message sent to topic TOPIC-DEV-topic1, tag tag2, body "同步消息测试5" success
2021-07-03 17:07:19.003 INFO 19420 --- [ Thread-21] c.e.p.c.TestPublisherController : topic:topic1,tag:tag2,sendResult:SendResult [sendStatus=SEND_OK, msgId=00000000000000000000000000000001000018B4AAC20DF94158000A, offsetMsgId=0A0100C300002A9F000000000000E754, messageQueue=MessageQueue [topic=TOPIC-DEV-topic1, brokerName=broker-a, queueId=14], queueOffset=12]
2021-07-03 17:07:20.005 INFO 19420 --- [ Thread-21] com.example.mqpublisher.RMQPublisher : message sent to topic TOPIC-DEV-topic1, tag tag2, body "同步消息测试6" success
2021-07-03 17:07:20.006 INFO 19420 --- [ Thread-21] c.e.p.c.TestPublisherController : topic:topic1,tag:tag2,sendResult:SendResult [sendStatus=SEND_OK, msgId=00000000000000000000000000000001000018B4AAC20DF94544000C阿里技术行 | Apache RocketMQ 首届开发者沙龙报名启动!
阿里邀请技术人看秀!Dubbo & RocketMQ ,开发者的春天到了?
始于阿里,回归社区 | Apache RocketMQ 开发者沙龙