RocketMQ源码学习笔记

Posted Shi Peng

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ源码学习笔记相关的知识,希望对你有一定的参考价值。

一、环境配置

1.1、下载源码

下载地址:https://github.com/apache/rocketmq

git clone https://github.com/apache/rocketmq.git

1.2、导入maven工程到IDE

1.3、准备启动的配置

1)在下载的rocketmq根目录创建新文件夹conf
2)把 rocketmq\\distribution\\conf 下的 broker.conf, broker-a.properties, logback_broker.xml 这几个文件copy到新建的conf目录下,并更新 broker-a.properties 文件的内容(红框部分为新添加):

用于nameserver启动时加载相应的配置。

1.4、启动NameServer 与 Broker

1.4.1、先启动 NameServer

在 namesrv 模块中找到 NamesrvStartup.java,运行前先配置run configurations, 在环境遍历中配置ROCKETMQ_HOME

nameserver启动成功后,会输出:

The Name Server boot success. serializeType=JSON

1.4.2、再启动 Broker

从broker模块中,找到 BrokerStartup.java 文件,配置其run configurations:
1)配置其Arguments启动配置

这个配置里,包括了上面修改的 namesrvAddr 等。

2)配置启动环境变量:

然后运行。运行结果输出:

The broker[broker-a, 10.91.199.184:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876

1.5、执行生产与消费

1.5.1、生产者

在rocketmq-example模块中,找到Producer.java,添加nameserver的地址(在前面的broker-a.properties中配置过):

运行,输出结果:

SendResult [sendStatus=SEND_OK, msgId=7F0000013F6C696633809874997A0000, offsetMsgId=0A5BC7B800002A9F0000000000000000, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=7F0000013F6C69663380987499E50001, offsetMsgId=0A5BC7B800002A9F00000000000000BE, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=7F0000013F6C69663380987499EB0002, offsetMsgId=0A5BC7B800002A9F000000000000017C, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=2], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=7F0000013F6C69663380987499F00003, offsetMsgId=0A5BC7B800002A9F000000000000023A, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=7F0000013F6C69663380987499F60004, offsetMsgId=0A5BC7B800002A9F00000000000002F8, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=7F0000013F6C69663380987499F90005, offsetMsgId=0A5BC7B800002A9F00000000000003B6, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=7F0000013F6C69663380987499FC0006, offsetMsgId=0A5BC7B800002A9F0000000000000474, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=2], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=7F0000013F6C6966338098749A000007, offsetMsgId=0A5BC7B800002A9F0000000000000532, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=7F0000013F6C6966338098749A020008, offsetMsgId=0A5BC7B800002A9F00000000000005F0, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=7F0000013F6C6966338098749A060009, offsetMsgId=0A5BC7B800002A9F00000000000006AE, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=2]
14:29:38.465 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
14:29:38.472 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[10.91.199.184:10911] result: true

1.5.2、消费者

在rocketmq-example模块中,找到 Consumer.java,添加NameServer地址:

然后运行,结果如下:

Consumer Started.
ConsumeMessageThread_please_rename_unique_group_name_4_2 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=0, storeSize=190, queueOffset=0, sysFlag=0, bornTimestamp=1648621778314, bornHost=/10.91.199.184:62230, storeTimestamp=1648621778374, storeHost=/10.91.199.184:10911, msgId=0A5BC7B800002A9F0000000000000000, commitLogOffset=0, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Messagetopic='TopicTest', flag=0, properties=MIN_OFFSET=0, MAX_OFFSET=3, CONSUME_START_TIME=1648622132214, UNIQ_KEY=7F0000013F6C696633809874997A0000, CLUSTER=DefaultCluster, TAGS=TagA, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48], transactionId='null']] 
ConsumeMessageThread_please_rename_unique_group_name_4_3 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=2, storeSize=190, queueOffset=0, sysFlag=0, bornTimestamp=1648621778411, bornHost=/10.91.199.184:62230, storeTimestamp=1648621778414, storeHost=/10.91.199.184:10911, msgId=0A5BC7B800002A9F000000000000017C, commitLogOffset=380, bodyCRC=1250039395, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Messagetopic='TopicTest', flag=0, properties=MIN_OFFSET=0, MAX_OFFSET=2, CONSUME_START_TIME=1648622132214, UNIQ_KEY=7F0000013F6C69663380987499EB0002, CLUSTER=DefaultCluster, TAGS=TagA, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 50], transactionId='null']] 
ConsumeMessageThread_please_rename_unique_group_name_4_6 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=3, storeSize=190, queueOffset=1, sysFlag=0, bornTimestamp=1648621778432, bornHost=/10.91.199.184:62230, storeTimestamp=1648621778433, storeHost=/10.91.199.184:10911, msgId=0A5BC7B800002A9F0000000000000532, commitLogOffset=1330, bodyCRC=988340972, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Messagetopic='TopicTest', flag=0, properties=MIN_OFFSET=0, MAX_OFFSET=2, CONSUME_START_TIME=1648622132214, UNIQ_KEY=7F0000013F6C6966338098749A000007, CLUSTER=DefaultCluster, TAGS=TagA, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 55], transactionId='null']] 
ConsumeMessageThread_please_rename_unique_group_name_4_1 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=1, storeSize=190, queueOffset=0, sysFlag=0, bornTimestamp=1648621778405, bornHost=/10.91.199.184:62230, storeTimestamp=1648621778409, storeHost=/10.91.199.184:10911, msgId=0A5BC7B800002A9F00000000000000BE, commitLogOffset=190, bodyCRC=1401636825, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Messagetopic='TopicTest', flag=0, properties=MIN_OFFSET=0, MAX_OFFSET=3, CONSUME_START_TIME=1648622132214, UNIQ_KEY=7F0000013F6C69663380987499E50001, CLUSTER=DefaultCluster, TAGS=TagA, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49], transactionId='null']] 
ConsumeMessageThread_please_rename_unique_group_name_4_4 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=3, storeSize=190, queueOffset=0, sysFlag=0, bornTimestamp=1648621778416, bornHost=/10.91.199.184:62230, storeTimestamp=1648621778420, storeHost=/10.91.199.184:10911, msgId=0A5BC7B800002A9F000000000000023A, commitLogOffset=570, bodyCRC=1032136437, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Messagetopic='TopicTest', flag=0, properties=MIN_OFFSET=0, MAX_OFFSET=2, CONSUME_START_TIME=1648622132214, UNIQ_KEY=7F0000013F6C69663380987499F00003, CLUSTER=DefaultCluster, TAGS=TagA, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 51], transactionId='null']] 
ConsumeMessageThread_please_rename_unique_group_name_4_5 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=0, storeSize=190, queueOffset=1, sysFlag=0, bornTimestamp=1648621778422, bornHost=/10.91.199.184:62230, storeTimestamp=1648621778424, storeHost=/10.91.199.184:10911, msgId=0A5BC7B800002A9F00000000000002F8, commitLogOffset=760, bodyCRC=601994070, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Messagetopic='TopicTest', flag=0, properties=MIN_OFFSET=0, MAX_OFFSET=3, CONSUME_START_TIME=1648622132214, UNIQ_KEY=7F0000013F6C69663380987499F60004, CLUSTER=DefaultCluster, TAGS=TagA, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 52], transactionId='null']] 
ConsumeMessageThread_please_rename_unique_group_name_4_9 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=0, storeSize=190, queueOffset=2, sysFlag=0, bornTimestamp=1648621778434, bornHost=/10.91.199.184:62230, storeTimestamp=1648621778436, storeHost=/10.91.199.184:10911, msgId=0A5BC7B800002A9F00000000000005F0, commitLogOffset=1520, bodyCRC=710410109, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Messagetopic='TopicTest', flag=0, properties=MIN_OFFSET=0, MAX_OFFSET=3, CONSUME_START_TIME=1648622132214, UNIQ_KEY=7F0000013F6C6966338098749A020008, CLUSTER=DefaultCluster, TAGS=TagA, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 56], transactionId='null']] 
ConsumeMessageThread_please_rename_unique_group_name_4_7 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=2, storeSize=190, queueOffset=1, sysFlag=0, bornTimestamp=1648621778428, bornHost=/10.91.199.184:62230, storeTimestamp=1648621778430, storeHost=/10.91.199.184:10911, msgId=0A5BC7B800002A9F0000000000000474, commitLogOffset=1140, bodyCRC=1307562618, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Messagetopic='TopicTest', flag=0, properties=MIN_OFFSET=0, MAX_OFFSET=2, CONSUME_START_TIME=1648622132214, UNIQ_KEY=7F0000013F6C69663380987499FC0006, CLUSTER=DefaultCluster, TAGS=TagA, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101RocketMQ源码学习笔记

RocketMQ学习笔记:消息发送模式

RocketMQ源码解析-NameServer篇

RocketMQ源码解析-NameServer篇

RocketMQ源码解析-NameServer篇

lua源码笔记-基本数据结构