自建RocketMQ中间件开发

Posted lisin-lee-cooper

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了自建RocketMQ中间件开发相关的知识,希望对你有一定的参考价值。

一.RocketMQ简介

1.简介
RocketMQ是一个纯Java、分布式、队列模型的开源消息中间件,前身是MetaQ,是阿里参考Kafka特点研发的一个队列模型的消息中间件,后开源给apache基金会成为了apache的顶级开源项目,具有高性能、高可靠、高实时、分布式特点。

更多内容可以查看官方文档 RocketMQ官方文档

2.订阅关系

RocketMQ正确订阅关系图

2.1.每个GroupId 对应一个微服务,一个微服务可以有多个节点,但是节点间的订阅关系必须一致,否则接收消息会紊乱

2.2一般一个服务发送一种消息,可以用不同的Tag来确定某种对应的业务模式

3.服务部署图

RocketMQ部署图

NameServer
用来保存活跃的 broker 列表,包括 Master 和 Slave ;用来保存所有 topic 和该 topic 所有队列的列表;用来保存所有 broker 的 Filter 列表。

4.原理及意义

1.为了避免过多的请求瞬间打到服务器,给数据库压力,导致崩溃

2.每个微服务无需关心消息底层发送逻辑,直接调用集成插件的api即可实现消息发送

二.代码框架

1.工程结构
在这里插入图片描述

在这里插入图片描述

在这里插入图片描述
框架详细源码 RocketMq中间件源码

三.测试验证

1.消息发送者

1.1 配置系统文件,启动服务时读取

#provicer.properties
mq.config.rmq.env=DEV
mq.config.rmq.groupId=testGroupId666
mq.config.rmq.nameSrv=127.0.0.1:9876
public class ProviderApplication {

    public static void main(String[] args) {
        try {
            System.getProperties().load(new FileInputStream("D:/app/config/provider.properties"));
        } catch (IOException e) {
            e.printStackTrace();
        }
        SpringApplication.run(ProviderApplication.class, args);
    }

}

1.2 单向消息发送

@GetMapping(value = "/test/pulisher/oneWay")
    public String sendMsg(@RequestParam("msg") String msg, @RequestParam("tag") String tag, @RequestParam("topic") String topic) {


        new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    Thread.sleep(1000);
                    rmqPublisher.sendOneWayMessage(topic, tag, msg + i);
                }
            } catch (Exception e) {
                log.info("e", e);
                e.printStackTrace();
            }
        }).start();
        return "success";
    }

2 消息接受者

2.1配置系统文件,启动时读取

## consumer.properties
mq.config.rmq.env=DEV
mq.config.rmq.groupId=testGroupId888
mq.config.rmq.nameSrv=127.0.0.1:9876

2.3消息接收监听器

@Slf4j
@Component
public class ConsumerListener extends AbstractRmqNormalListener {

    @Override
    protected ConsumeConcurrentlyStatus doConsume(MessageExt messageExt) {
        try {
            log.info("message received on topic {} , msgTag {},content: {}, msgKey {}, msgID {}", messageExt.getTopic(), messageExt.getTags(), new String(messageExt.getBody(), StandardCharsets.UTF_8),
                    messageExt.getKeys(), messageExt.getMsgId());
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        } catch (Exception e) {
            log.error("ConsumerListener error : {}, message : {}", e, JSON.toJSONString(messageExt));
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    }

    @Override
    public List<MQTopicInfo> getTopicList() {
        MQTopicInfo mqTopicInfo = new MQTopicInfo();
        mqTopicInfo.setName("topic1");
        mqTopicInfo.setTags(new HashSet<>(Arrays.asList("*")));
        return Arrays.asList(mqTopicInfo);
    }
}

3.验证结果

3.1 发送单向消息

模拟发送消息

http://127.0.0.1:8080/test/pulisher/oneWay?topic=topic1&tag=tag2&msg=单向消息测试

2021-07-03 16:37:32.534  INFO 18848 --- [      Thread-23] com.example.mqpublisher.RMQPublisher     : oneWayMessage sent to topic TOPIC-DEV-topic1, tag tag2, body "单向消息测试0" success 
2021-07-03 16:37:33.535  INFO 18848 --- [      Thread-23] com.example.mqpublisher.RMQPublisher     : oneWayMessage sent to topic TOPIC-DEV-topic1, tag tag2, body "单向消息测试1" success 
2021-07-03 16:37:34.535  INFO 18848 --- [      Thread-23] com.example.mqpublisher.RMQPublisher     : oneWayMessage sent to topic TOPIC-DEV-topic1, tag tag2, body "单向消息测试2" success 
2021-07-03 16:37:35.536  INFO 18848 --- [      Thread-23] com.example.mqpublisher.RMQPublisher     : oneWayMessage sent to topic TOPIC-DEV-topic1, tag tag2, body "单向消息测试3" success 
2021-07-03 16:37:36.536  INFO 18848 --- [      Thread-23] com.example.mqpublisher.RMQPublisher     : oneWayMessage sent to topic TOPIC-DEV-topic1, tag tag2, body "单向消息测试4" success 
2021-07-03 16:37:37.537  INFO 18848 --- [      Thread-23] com.example.mqpublisher.RMQPublisher     : oneWayMessage sent to topic TOPIC-DEV-topic1, tag tag2, body "单向消息测试5" success 
2021-07-03 16:37:38.538  INFO 18848 --- [      Thread-23] com.example.mqpublisher.RMQPublisher     : oneWayMessage sent to topic TOPIC-DEV-topic1, tag tag2, body "单向消息测试6" success 
2021-07-03 16:37:39.538  INFO 18848 --- [      Thread-23] com.example.mqpublisher.RMQPublisher     : oneWayMessage sent to topic TOPIC-DEV-topic1, tag tag2, body "单向消息测试7" success 
2021-07-03 16:37:40.538  INFO 18848 --- [      Thread-23] com.example.mqpublisher.RMQPublisher     : oneWayMessage sent to topic TOPIC-DEV-topic1, tag tag2, body "单向消息测试8" success 
2021-07-03 16:37:41.539  INFO 18848 --- [      Thread-23] com.example.mqpublisher.RMQPublisher     : oneWayMessage sent to topic TOPIC-DEV-topic1, tag tag2, body "单向消息测试9" success 

接收消息

2021-07-03 16:37:32.936  INFO 18328 --- [MessageThread_1] c.e.consumer.listener.ConsumerListener   : message received on topic topic1 , msgTag tag2,content: "单向消息测试0", msgKey tag2, msgID 00000000000000000000000000000001000018B4AAC20DDDFEF60014
2021-07-03 16:37:33.540  INFO 18328 --- [MessageThread_2] c.e.consumer.listener.ConsumerListener   : message received on topic topic1 , msgTag tag2,content: "单向消息测试1", msgKey tag2, msgID 00000000000000000000000000000001000018B4AAC20DDE02DF0016
2021-07-03 16:37:34.539  INFO 18328 --- [MessageThread_3] c.e.consumer.listener.ConsumerListener   : message received on topic topic1 , msgTag tag2,content: "单向消息测试2", msgKey tag2, msgID 00000000000000000000000000000001000018B4AAC20DDE06C70018
2021-07-03 16:37:35.542  INFO 18328 --- [MessageThread_4] c.e.consumer.listener.ConsumerListener   : message received on topic topic1 , msgTag tag2,content: "单向消息测试3", msgKey tag2, msgID 00000000000000000000000000000001000018B4AAC20DDE0AB0001A
2021-07-03 16:37:36.543  INFO 18328 --- [MessageThread_5] c.e.consumer.listener.ConsumerListener   : message received on topic topic1 , msgTag tag2,content: "单向消息测试4", msgKey tag2, msgID 00000000000000000000000000000001000018B4AAC20DDE0E98001C

2021-07-03 16:37:37.927  INFO 11484 --- [MessageThread_1] c.e.consumer.listener.ConsumerListener   : message received on topic topic1 , msgTag tag2,content: "单向消息测试5", msgKey tag2, msgID 00000000000000000000000000000001000018B4AAC20DDE1281001E
2021-07-03 16:37:38.544  INFO 11484 --- [MessageThread_2] c.e.consumer.listener.ConsumerListener   : message received on topic topic1 , msgTag tag2,content: "单向消息测试6", msgKey tag2, msgID 00000000000000000000000000000001000018B4AAC20DDE166A0020
2021-07-03 16:37:39.541  INFO 11484 --- [MessageThread_3] c.e.consumer.listener.ConsumerListener   : message received on topic topic1 , msgTag tag2,content: "单向消息测试7", msgKey tag2, msgID 00000000000000000000000000000001000018B4AAC20DDE1A520022
2021-07-03 16:37:40.542  INFO 11484 --- [MessageThread_4] c.e.consumer.listener.ConsumerListener   : message received on topic topic1 , msgTag tag2,content: "单向消息测试8", msgKey tag2, msgID 00000000000000000000000000000001000018B4AAC20DDE1E3A0024
2021-07-03 16:37:41.544  INFO 11484 --- [MessageThread_5] c.e.consumer.listener.ConsumerListener   : message received on topic topic1 , msgTag tag2,content: "单向消息测试9", msgKey tag2, msgID 00000000000000000000000000000001000018B4AAC20DDE22230026

3.2发送同步消息

模拟发送消息

http://127.0.0.1:8080/test/pulisher/sync?topic=topic1&tag=tag2&msg=同步消息测试

2021-07-03 17:07:13.989  INFO 19420 --- [      Thread-21] com.example.mqpublisher.RMQPublisher     : message sent to topic TOPIC-DEV-topic1, tag tag2, body "同步消息测试0" success 
2021-07-03 17:07:13.989  INFO 19420 --- [      Thread-21] c.e.p.c.TestPublisherController          : topic:topic1,tag:tag2,sendResult:SendResult [sendStatus=SEND_OK, msgId=00000000000000000000000000000001000018B4AAC20DF92D020000, offsetMsgId=0A0100C300002A9F000000000000E326, messageQueue=MessageQueue [topic=TOPIC-DEV-topic1, brokerName=broker-a, queueId=9], queueOffset=11]
2021-07-03 17:07:14.992  INFO 19420 --- [      Thread-21] com.example.mqpublisher.RMQPublisher     : message sent to topic TOPIC-DEV-topic1, tag tag2, body "同步消息测试1" success 
2021-07-03 17:07:14.993  INFO 19420 --- [      Thread-21] c.e.p.c.TestPublisherController          : topic:topic1,tag:tag2,sendResult:SendResult [sendStatus=SEND_OK, msgId=00000000000000000000000000000001000018B4AAC20DF931AF0002, offsetMsgId=0A0100C300002A9F000000000000E3FC, messageQueue=MessageQueue [topic=TOPIC-DEV-topic1, brokerName=broker-a, queueId=10], queueOffset=14]
2021-07-03 17:07:15.994  INFO 19420 --- [      Thread-21] com.example.mqpublisher.RMQPublisher     : message sent to topic TOPIC-DEV-topic1, tag tag2, body "同步消息测试2" success 
2021-07-03 17:07:15.995  INFO 19420 --- [      Thread-21] c.e.p.c.TestPublisherController          : topic:topic1,tag:tag2,sendResult:SendResult [sendStatus=SEND_OK, msgId=00000000000000000000000000000001000018B4AAC20DF935990004, offsetMsgId=0A0100C300002A9F000000000000E4D2, messageQueue=MessageQueue [topic=TOPIC-DEV-topic1, brokerName=broker-a, queueId=11], queueOffset=16]
2021-07-03 17:07:16.996  INFO 19420 --- [      Thread-21] com.example.mqpublisher.RMQPublisher     : message sent to topic TOPIC-DEV-topic1, tag tag2, body "同步消息测试3" success 
2021-07-03 17:07:16.996  INFO 19420 --- [      Thread-21] c.e.p.c.TestPublisherController          : topic:topic1,tag:tag2,sendResult:SendResult [sendStatus=SEND_OK, msgId=00000000000000000000000000000001000018B4AAC20DF939830006, offsetMsgId=0A0100C300002A9F000000000000E5A8, messageQueue=MessageQueue [topic=TOPIC-DEV-topic1, brokerName=broker-a, queueId=12], queueOffset=15]
2021-07-03 17:07:18.000  INFO 19420 --- [      Thread-21] com.example.mqpublisher.RMQPublisher     : message sent to topic TOPIC-DEV-topic1, tag tag2, body "同步消息测试4" success 
2021-07-03 17:07:18.000  INFO 19420 --- [      Thread-21] c.e.p.c.TestPublisherController          : topic:topic1,tag:tag2,sendResult:SendResult [sendStatus=SEND_OK, msgId=00000000000000000000000000000001000018B4AAC20DF93D6E0008, offsetMsgId=0A0100C300002A9F000000000000E67E, messageQueue=MessageQueue [topic=TOPIC-DEV-topic1, brokerName=broker-a, queueId=13], queueOffset=15]
2021-07-03 17:07:19.003  INFO 19420 --- [      Thread-21] com.example.mqpublisher.RMQPublisher     : message sent to topic TOPIC-DEV-topic1, tag tag2, body "同步消息测试5" success 
2021-07-03 17:07:19.003  INFO 19420 --- [      Thread-21] c.e.p.c.TestPublisherController          : topic:topic1,tag:tag2,sendResult:SendResult [sendStatus=SEND_OK, msgId=00000000000000000000000000000001000018B4AAC20DF94158000A, offsetMsgId=0A0100C300002A9F000000000000E754, messageQueue=MessageQueue [topic=TOPIC-DEV-topic1, brokerName=broker-a, queueId=14], queueOffset=12]
2021-07-03 17:07:20.005  INFO 19420 --- [      Thread-21] com.example.mqpublisher.RMQPublisher     : message sent to topic TOPIC-DEV-topic1, tag tag2, body "同步消息测试6" success 
2021-07-03 17:07:20.006  INFO 19420 --- [      Thread-21] c.e.p.c.TestPublisherController          : topic:topic1,tag:tag2,sendResult:SendResult [sendStatus=SEND_OK, msgId=00000000000000000000000000000001000018B4AAC20DF94544000C阿里技术行 | Apache RocketMQ 首届开发者沙龙报名启动!

阿里邀请技术人看秀!Dubbo & RocketMQ ,开发者的春天到了?

始于阿里,回归社区 | Apache RocketMQ 开发者沙龙

资源预告消息中间件RocketMQ

报名 | Apache RocketMQ 开发者沙龙 · 杭州站

你懂RocketMQ 的架构原理吗?