Dubbo源码,详解dubbo协议数据包及解包过程

Posted Java艺术

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Dubbo源码,详解dubbo协议数据包及解包过程相关的知识,希望对你有一定的参考价值。

关注 “Java艺术”一起来充电吧!

RPC协议即远程进程调用协议,自己的话理解就是,让两个进程之间的调用像同进程内调用一个函数那么简单。Dubbo框架的传输层默认使用dubbo协议,这也是一种RPC远程通信协议。学习Dubbo,我们有必要了解dubbo协议长什么样,最好的办法就是从源码中寻找答案。本篇将通过源码分析了解dubbo协议,以及了解dubbo协议数据包的解码过程。


Dubbo源码,详解dubbo协议数据包及解包过程


dubbo协议数据包


dubbo协议数据包无论是请求数据包还是响应数据包,大体分为两部分,一部分是头部,一部分是body。头部的长度为16字节,这是固定的。而头部又可分为5个小部分:魔数 + 序列化标志和消息类型 + 状态码 + 序列化后的消息body的长度 + 请求id

Dubbo源码,详解dubbo协议数据包及解包过程
  • 0~1字节:共两个字节,存储魔数,标志这是一个dubbo协议的数据包;

  • 2~2字节:共一个字节,高3位存储消息类型,低5位存储序列化协议id;

  • 3~3字节:共一个字节,请求数据包并未使用,响应数据包用来存储响应的状态码;

  • 4~7字节:共四个字节,描述body的长度;

  • 8~15字节:共八字节,请求id。


魔数是固定的,存储的是0xdabb,标志这是一个dubbo协议的请求或响应数据包;序列化标志就是序列化协议的id,每种序列化协议都对应一个id,比如hession2序列化协议的id是2;消息类型标志这是一个请求消息还是一个响应消息,以及是否事件类型的消息;请求id用于标识一次请求,客户端需要根据这个请求id识别一个响应是哪个请求的回复。


整个数据包非常的简单,而唯一算得上复杂一点的,就是第3个字节码。使用高3位标志一个消息类型,如第8位为0表示这是一个响应数据包,第8位为1表示这是一个请求数据包。低5位表示使用的序列化协议。


在发送消息时,先计算出序列化协议,比如hession2序列化协议的id为2,将序列化协议id‘或’FLAG_REQUEST得到10000010服务端只需要将该byte‘与’10000000,结果为1则拿到消息类型为FLAG_REQUEST说明这是一个请求数据包,将byte‘与’SERIALIZATION_MASK就能得到序列化协议的id。


protected static final byte FLAG_REQUEST = (byte) 0x80;//10000000
protected static final byte FLAG_TWOWAY = (byte) 0x40; //01000000
protected static final byte FLAG_EVENT = (byte) 0x20; //00100000
protected static final int SERIALIZATION_MASK = 0x1f; //00011111


可以看出,dubbo在努力的减少数据包的大小。与http协议的数据包相比,是不是感觉数据包小很多?当然,只是这样对比是没有意义的,假设一个接口调用,不需要任何参数,dubbo协议要描述请求哪个接口的哪个方法需要填写在body,http协议可以只在请求行上修改uri,那么此时,哪种协议的数据包更大,就需要通过抓包分析了。我的直觉告诉我,还是会比http协议的数据包小,因为http协议的请求头会比描述一个接口的某个方法的字符串更长。


dubbo协议的数据包解析过程会比http协议的请求效率高很多,因为dubbo协议不需要一行行探测是否是一个数据包的开始,只要找到魔术,就找到了一个数据包,按顺序读取16字节的数据就是请求头,根据请求头中的body长度就能拿到body,根据序列化算法就能解析body。


dubbo协议数据包解码源码分析


源码在dubbo-rpc模块的子模块dubbo-rpc-dubbo,主要分析的类:DubboCodecExchangeCodec

Dubbo源码,详解dubbo协议数据包及解包过程
  • ExchangeCodec负责编解码数据包

  • DubboCodec负责编解码body


解码数据包入口在ExchangeCodec类的decode方法


@Override
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
int readable = buffer.readableBytes();
byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
buffer.readBytes(header);
return decode(channel, buffer, readable, header);
}

* 数据包头部长度:16byte
* 2byte魔数:0xdabb
* 1字节序列化标志|消息类型
* 1字节状态码
* 4byte数据包body长度
* 8byte请求id
* 请求body
*
* @param channel
* @param buffer
* @param readable
* @param header
* @return
* @throws IOException
*/
@Override
protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
// check magic number.检查魔数是否是数据包的开始
if (readable > 0 && header[0] != MAGIC_HIGH || readable > 1 && header[1] != MAGIC_LOW) {
int length = header.length;
if (header.length < readable) {
header = Bytes.copyOf(header, readable);
buffer.readBytes(header, length, readable - length);
}
// 读取请求头
for (int i = 1; i < header.length - 1; i++) {
if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
buffer.readerIndex(buffer.readerIndex() - header.length + i);
header = Bytes.copyOf(header, i);
break;
}
}
return super.decode(channel, buffer, readable, header);
}
// check length. 检查读取到的字节长度是否大于请求头长度16字节码,如果不是,则等待接收到更多的字节再解析
if (readable < HEADER_LENGTH) {
return DecodeResult.NEED_MORE_INPUT;
}

// get data length. 从请求头中获取数据包长度
int len = Bytes.bytes2int(header, 12);
checkPayload(channel, len);

// 如果当前读取的字节长度少于body长度+请求头长度,则继续等待接收到一个完整的数据包
int tt = len + HEADER_LENGTH;
if (readable < tt) {
return DecodeResult.NEED_MORE_INPUT;
}

// limit input stream.
// 根据长度截取一个完整的数据包
ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);

try {
return decodeBody(channel, is, header);
} finally {
if (is.available() > 0) {
try {
if (logger.isWarnEnabled()) {
logger.warn("Skip input stream " + is.available());
}
StreamUtils.skipUnusedStream(is);
} catch (IOException e) {
logger.warn(e.getMessage(), e);
}
}
}
}


decode方法首先从buffer中读取固定长度的请求头,检查请求头的前两个字节是否是0xdabb,如果不是,则循环探测找出0xdabb,这是解决粘包问题,如前一个数据包解析出错遗留下的一部分字节数据,需要去掉这部分脏数据。


接着检查当前buffer的长度是否少于请求头的长度,少于则数据包不完整,需要等待接收更多的字节数据再解析。否则从请求头中获取body长度,如果body长度加上请求头长度大于当前buffer的长度,则数据包不完整,继续等待接收更多的字节数据再解析。


如果数据包检验通过,则根据body长度从buffer中读取body,调用decodeBody继续解码数据包。子类DubboCodec覆写了decodeBody方法,因此我们需要分析DubboCodecdecodeBody方法


 @Override
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
// 获取消息类型与序列化协议id
byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
// 获取请求id
long id = Bytes.bytes2long(header, 4);
if ((flag & FLAG_REQUEST) == 0) {
// 解码响应数据包(客户端)
Response res = new Response(id);
if ((flag & FLAG_EVENT) != 0) {
res.setEvent(true);
}
// 获取响应状态码
byte status = header[3];
res.setStatus(status);
try {
if (status == Response.OK) {
Object data;
// 心跳包
if (res.isHeartbeat()) {
ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
data = decodeHeartbeatData(channel, in);
}
// 事件
else if (res.isEvent()) {
ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
data = decodeEventData(channel, in);
}
// 正常请求响应
else {
DecodeableRpcResult result;
if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) {
result = new DecodeableRpcResult(channel, res, is,
(Invocation) getRequestData(id), proto);
result.decode();
} else {
result = new DecodeableRpcResult(channel, res,
new UnsafeByteArrayInputStream(readMessageData(is)),
(Invocation) getRequestData(id), proto);
}
data = result;
}
res.setResult(data);
} else {
ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
res.setErrorMessage(in.readUTF());
}
} catch (Throwable t) {
if (log.isWarnEnabled()) {
log.warn("Decode response failed: " + t.getMessage(), t);
}
res.setStatus(Response.CLIENT_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
return res;
} else {
// 解码请求数据包
Request req = new Request(id);
req.setVersion(Version.getProtocolVersion());
req.setTwoWay((flag & FLAG_TWOWAY) != 0);
if ((flag & FLAG_EVENT) != 0) {
req.setEvent(true);
}
try {
Object data;
ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
// 心跳包
if (req.isHeartbeat()) {
data = decodeHeartbeatData(channel, in);
}
// 事件
else if (req.isEvent()) {
data = decodeEventData(channel, in);
}
// 正常请求
else {
DecodeableRpcInvocation inv;
if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) {
inv = new DecodeableRpcInvocation(channel, req, is, proto);
inv.decode();
} else {
inv = new DecodeableRpcInvocation(channel, req,
new UnsafeByteArrayInputStream(readMessageData(is)), proto);
}
data = inv;
}
req.setData(data);
} catch (Throwable t) {
if (log.isWarnEnabled()) {
log.warn("Decode request failed: " + t.getMessage(), t);
}
// bad request
req.setBroken(true);
req.setData(t);
}
return req;
}
}


decodeBody方法虽然很长,但很简单。从请求头中获取到消息类型与序列化协议id、请求id。根据消息类型判断这是一个请求数据包还是响应数据包。如果flag & FLAG_REQUEST = FLAG_REQUEST,表示这是一个请求数据包,否则就是一个响应数据包。


响应数据包:创建Response对象,设置响应是否事件类型,如果flag & FLAG_EVENT = FLAG_EVENT,则说明此响应还是一个事件。响应数据包要求有状态码,请求头的第四个字节。根据响应状态码解码响应的data,响应数据包也分三种类型,分别是心跳包、事件响应、正常的请求响应。这里不讨论事件类型与心跳包。


请求数据包:与解码响应数据包一样,根据请求id创建一个Request对象,根据请求头的事件类型标志设置是否event,最大的区别是请求数据包不使用请求头的第四个字节。根据从请求头获取的序列化协议id调用CodecSupport.deserialize方法解码body。


public static ObjectInput deserialize(URL url, InputStream is, byte proto) throws IOException {
// 根据序列化标志获取序列化器
Serialization s = getSerialization(url, proto);
return s.deserialize(url, is);
}

public static Serialization getSerialization(URL url, Byte id) throws IOException {
Serialization serialization = getSerializationById(id);
// 从url获取序列器名称,默认为hession2
String serializationName = url.getParameter(Constants.SERIALIZATION_KEY, Constants.DEFAULT_REMOTING_SERIALIZATION);
// 为了安全起见,检查从网络传递的“序列化id”是否与此端的id匹配(仅对JDK序列化生效)
if (serialization == null
|| ((id == JAVA_SERIALIZATION_ID || id == NATIVE_JAVA_SERIALIZATION_ID || id == COMPACTED_JAVA_SERIALIZATION_ID)
&& !(serializationName.equals(ID_SERIALIZATIONNAME_MAP.get(id))))) {
throw new IOException("Unexpected serialization id:" + id + " received from network, please check if the peer send the right id.");
}
return serialization;
}


根据序列化协议id获取Serialization,调用Serializationdeserialize方法完成反序列化操作,序列化和反序列是我们最熟悉不过的,常用的api接口开发使用的就是json协议的序列化与反序列,而dubbo提供了很多种序列化协议的支持,如json、hession2。dubbo会为每种序列化协议分配一个id,使用map做映射,通过id拿到Serialization序列化器。


 /**
* id-序列化器 映射
*/
private static Map<Byte, Serialization> ID_SERIALIZATION_MAP = new HashMap<Byte, Serialization>();


Dubbo源码,详解dubbo协议数据包及解包过程

dubbo协议的数据包解码流程相比http协议的数据包解码流程简单许多,编码过程没有分析完整,但其实就是解码的反向操作而已。看完http协议数据包的解码过程与dubbo协议数据包的解码过程,也了解到http协议的数据包和dubbo协议的数据包长啥样了,你觉得哪种协议的解码效率更高、对内存的使用消耗更少?欢迎留言,说说你的看法。


往期相关推荐



公众号:Java艺术
扫码关注最新动态


以上是关于Dubbo源码,详解dubbo协议数据包及解包过程的主要内容,如果未能解决你的问题,请参考以下文章

dubbo协议

Dubbo源码解析:网络通信

一文详解 Dubbo 中的 http 协议

Dubbo底层原理,java分布式框架,源码分析

dubbo源码学习:暴露服务的过程

Dubbo源码分析系列-服务的发布