阿里专家分享:RocketMQ底层通信机制
Posted 中生代架构
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了阿里专家分享:RocketMQ底层通信机制相关的知识,希望对你有一定的参考价值。
作者介绍
杨开元
阿里巴巴数据专家,毕业于北京大学,有10年IT行业研发经验。对RocketMQ有深入的研究,是RocketMQ源码贡献者。
曾就职于甲骨文和猎豹移动,专注于大数据和实时计算。在大量的工作实践中,对mysql、J2EE、JVM、Spring、Hadoop、Kafka、Storm、Flink都有深入研究。
喜欢剖析源码,分析原理,为开源项目贡献代码。
第62篇架构好文:4802字 | 9分钟阅读
前言
分布式系统各个角色间的通信效率很关键,通信效率的高低直接影响系统性能,基于Socket实现一个高效的Tcp通信协议是个很有挑战的事情。
本节说明RocketMQ是如何解决这个问题的。
01
Remoting模块
_____
RocketMQ的通信相关代码在Remoting模块里,先来看看主要类结构。
图1-1 Remoting模块的类继承关系
RemotingService为最上层接口,定义了三个方法:
void start();
void shutdown();
void registerRPCHook(RPCHookrpcHook);
RemotingClient,RemotingServer继承RemotingService接口, 并增加了自己特有的方法。
代码清单1-1 RemotingClient主要函数定义
1void registerProcessor(final int requestCode, finalNettyRequestProcessor processor,final ExecutorService executor);
2RemotingCommand invokeSync(final String addr, final RemotingCommandrequest, final long timeoutMillis);
3void invokeAsync(final String addr, final RemotingCommand request,final long timeoutMillis,final InvokeCallback invokeCallback);
4void invokeOneway(final String addr, final RemotingCommand request,final long timeoutMillis);
5void updateNameServerAddressList(final List<String> addrs);
然后看看具体的实现类,NettyRemotingClient和NettyRemotingServer分别实现了RemotingClient和RemotingServer, 而且都继承了NettyRemotingAbstract类.
通过上面的封装,RocketMQ各个模块间的通信,可以通过发送统一格式的自定义消息(RemotingCommand)来完成的,各个模块间的通信实现简洁明了。
比如NameServer模块中,NameServerController有个remotingServer变量,NameServer在启动时初始化好各个变量,然后启动remotingServer即可,剩下NameServer要做的是专心实现好处理RemotingCommand的逻辑。
代码清单1-2 NameServer处理主流程代码
1@Override
2public RemotingCommand processRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {
3 if (log.isDebugEnabled()){
4 log.debug("receive request, {} {} {}",
5 request.getCode(),
6 RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
7 request);
8 }
9 switch (request.getCode()){
10 caseRequestCode.PUT_KV_CONFIG:
11 returnthis.putKVConfig(ctx, request);
12 caseRequestCode.GET_KV_CONFIG:
13 returnthis.getKVConfig(ctx, request);
14 caseRequestCode.DELETE_KV_CONFIG:
15 returnthis.deleteKVConfig(ctx, request);
16 caseRequestCode.REGISTER_BROKER:
17 VersionbrokerVersion = MQVersion.value2Version(request.getVersion());
18 if (brokerVersion.ordinal()>= MQVersion.Version.V3_0_11.ordinal()) {
19 returnthis.registerBrokerWithFilterServer(ctx, request);
20 } else {
21 returnthis.registerBroker(ctx, request);
22 }
23 caseRequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
24 returnthis.getHasUnitSubUnUnitTopicList(ctx, request);
25 caseRequestCode.UPDATE_NAMESRV_CONFIG:
26 returnthis.updateConfig(ctx, request);
27 caseRequestCode.GET_NAMESRV_CONFIG:
28 returnthis.getConfig(ctx, request);
29 default:
30 break;
31 }
32 return null;
33}
在Consumer的源码中,获取消息的底层的通信部分也是发送一个RemotingComand 请求,返回的response也是个RemotingCommand类型。
代码清单1-3 Consumer请求消息底层实现代码
1private PullResult pullMessageSync(//
2 final String addr, // 1
3 final RemotingCommandrequest, // 2
4 final long timeoutMillis//3
5) throws RemotingException, InterruptedException, MQBrokerException{
6 RemotingCommand response =this.remotingClient.invokeSync(addr, request, timeoutMillis);
7 assert response != null;
8 returnthis.processPullResponse(response);
9}
从源码中可以看出,RocketMQ中复杂的通信过程,被RemotingCommand统一起来,大部分的逻辑都是通过发送Command,接受并处理Command完成。
02
协议设计和编解码
_____
RocketMQ自己定义了一个通信协议,使得模块间传输的二进制消息和有意义的内容之间互相转换。协议格式如图1-2所示。
图1-2 RocketMQ的通信协议
(1)第一部分是大端4个字节整数,值等于第二,三,四部分长度总和
(2)第二部分是大端4个字节整数,值等于第三部分的长度
(3)第三部分是通过json 序列化的数据
(4)第四部分是通过应用自定义二进制序列化的数据
消息的解码过程在RomotingCommand的decode函数里。
代码清单1-4 消息解码函数
1public static RemotingCommand decode(final ByteBuffer byteBuffer) {
2 int length =byteBuffer.limit();
3 int oriHeaderLen =byteBuffer.getInt();
4 int headerLength =getHeaderLength(oriHeaderLen);
5 byte[] headerData = newbyte[headerLength];
6 byteBuffer.get(headerData);
7 RemotingCommand cmd =headerDecode(headerData, getProtocolType(oriHeaderLen));
8 int bodyLength = length - 4 - headerLength;
9 byte[] bodyData = null;
10 if (bodyLength > 0) {
11 bodyData = newbyte[bodyLength];
12 byteBuffer.get(bodyData);
13 }
14 cmd.body = bodyData;
15 return cmd;
16}
对应的消息编码过程在RemotingCommand的encode函数中。
代码清单1-5 消息编码函数
1public ByteBuffer encode() {
2 // 1> header lengthsize
3 int length = 4;
4 // 2> header datalength
5 byte[] headerData =this.headerEncode();
6 length +=headerData.length;
7 // 3> body data length
8 if (this.body != null) {
9 length += body.length;
10 }
11 ByteBuffer result =ByteBuffer.allocate(4 + length);
12 // length
13 result.putInt(length);
14 // header length
15 result.put(markProtocolType(headerData.length,serializeTypeCurrentRPC));
16 // header data
17 result.put(headerData);
18 // body data;
19 if (this.body != null) {
20 result.put(this.body);
21 }
22 result.flip();
23 return result;
24}
03
Netty库
_____
RocketMQ是基于Netty库来完成RemotingServer和RemotingClient具体的通信实现的,Netty是个事件驱动的网络编程框架,它屏蔽了Java Socket,Nio等复杂细节,用户只需用好Netty,就可以实现一个网络编程专家+并发编程专家水平的Server、Client网络程序。
应用Netty有一定的门槛,需要了解它的EventLoopGroup,Channel,Handler模型以及各种具体的配置。
RocketMQ利用Netty实现的通信类是NettyRemotingServer和NettyRemotingClient,用户也可以参考这两个类的实现来学习使用Netty。
想要加入中生代架构群的小伙伴,请添加群助手小姜的微信
申请备注(姓名+公司+技术方向)才能通过哦!
“做技术要懂管理,做管理要学模式”
突破是中生代社区新上架的图书
签名版即将售罄
购书后加上方微信可以进书友群
申请备注(购书)
以上是关于阿里专家分享:RocketMQ底层通信机制的主要内容,如果未能解决你的问题,请参考以下文章
精华推荐 | 深入浅出RocketMQ原理及实战「底层原理挖掘系列」透彻剖析贯穿RocketMQ的存储系统的实现原理和持久化机制
精华推荐 | 深入浅出RocketMQ原理及实战「底层原理挖掘系列」透彻剖析贯穿RocketMQ的存储系统的实现原理和持久化机制