Netty协议-Rocket MQ之NettyRemotingClient/Server
Posted 爱叨叨的程序狗
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty协议-Rocket MQ之NettyRemotingClient/Server相关的知识,希望对你有一定的参考价值。
RocketMQ性能强劲依赖于Netty通讯协议和特定的通讯协议。
通信流程
NettyRemotingAbstract类的内部有一个NettyServerHandler内部类,该类中封装了客户端与服务端交互的基础代码。
processMessageReceived根据cmd类型处理msg信息
@ChannelHandler.Sharable
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand>
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception
processMessageReceived(ctx, msg);
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception
final RemotingCommand cmd = msg;
if (cmd != null)
switch (cmd.getType())
case REQUEST_COMMAND:
processRequestCommand(ctx, cmd);
break;
case RESPONSE_COMMAND:
processResponseCommand(ctx, cmd);
break;
default:
break;
processResponseCommand
/**
* Process incoming request command issued by remote peer.
*
* @param ctx channel handler context.
* @param cmd request command.
*/
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd)
final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
final int opaque = cmd.getOpaque();
if (pair != null)
Runnable run = new Runnable()
@Override
public void run()
try
//解析远程(服务端)channel
String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
//RPC调用前执行钩子函数
doBeforeRpcHooks(remoteAddr, cmd);
final RemotingResponseCallback callback = new RemotingResponseCallback()
@Override
public void callback(RemotingCommand response)
doAfterRpcHooks(remoteAddr, cmd, response);
//判断是否为单向通信
if (!cmd.isOnewayRPC())
if (response != null)
response.setOpaque(opaque);
response.markResponseType();
try
//消息追加到内存后刷入硬盘中
ctx.writeAndFlush(response);
catch (Throwable e)
else
;
if (pair.getObject1() instanceof AsyncNettyRequestProcessor)
AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1();
//异步处理请求
processor.asyncProcessRequest(ctx, cmd, callback);
else
NettyRequestProcessor processor = pair.getObject1();
RemotingCommand response = processor.processRequest(ctx, cmd);
callback.callback(response);
catch (Throwable e)
log.error("process request exception", e);
log.error(cmd.toString());
if (!cmd.isOnewayRPC())
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
RemotingHelper.exceptionSimpleDesc(e));
response.setOpaque(opaque);
ctx.writeAndFlush(response);
;
try
final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
pair.getObject2().submit(requestTask);
catch (RejectedExecutionException e)
if ((System.currentTimeMillis() % 10000) == 0)
log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
+ ", too many requests and system thread pool busy, RejectedExecutionException "
+ pair.getObject2().toString()
+ " request code: " + cmd.getCode());
if (!cmd.isOnewayRPC())
//非oneWay方式消息发送,构造发送消息响应信息
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[OVERLOAD]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
...
processResponseCommand
/**
* Process response from remote peer to the previous issued requests.
*
* @param ctx channel handler context.
* @param cmd response command instance.
*/
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd)
//opaque被赋值为requestId
final int opaque = cmd.getOpaque();
//从map缓存获取正在进行的其中一个请求
final ResponseFuture responseFuture = responseTable.get(opaque);
if (responseFuture != null)
responseFuture.setResponseCommand(cmd);
//移除本次请求
responseTable.remove(opaque);
if (responseFuture.getInvokeCallback() != null)
//执行回调(排序、执行完成)
executeInvokeCallback(responseFuture);
else
responseFuture.putResponse(cmd);
responseFuture.release();
else
log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
log.warn(cmd.toString());
protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =
new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);protected final ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable =
new ConcurrentHashMap<Integer, ResponseFuture>(256);processorTable :记录全部请求、以及处理器。
responseTable记录所有响应。
更清晰的文件请下载:
通讯协议
RocketMQ自定义的私有协议栈都是基于TCP/IP协议,使用Netty的NIO TCP协议栈进行私有协议栈的定制和开发。
RocketMQ协议共分为四个部分:
- Header data:协议头,数据是序列化【fastjosn】后的json,json的每个key字段都是固定的,
- body data:请求的二进制实际数据,例如发送消息的网络请求中,Body传输实际的消息内容。
- Length:消息总长度
- Header length:序列化类型&消息头长度,第一个字节表示序列化类型,后面三个自己表示消息头长度。
RemotingCommand类封装了通讯消息、编码、解码方式,这些组成了RocketMQ的通讯协议。
//根据自定义协议解析消息头、消息体
public static RemotingCommand decode(final ByteBuffer byteBuffer) throws RemotingCommandException
int length = byteBuffer.limit();
int oriHeaderLen = byteBuffer.getInt();
//获取头部报文长度
int headerLength = getHeaderLength(oriHeaderLen);
//获取头部报文数据
byte[] headerData = new byte[headerLength];
byteBuffer.get(headerData);
//反序列化解析Header data和RemotingCommand类
RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
//获取body长度
int bodyLength = length - 4 - headerLength;
byte[] bodyData = null;
if (bodyLength > 0)
//获取Body数据
bodyData = new byte[bodyLength];
byteBuffer.get(bodyData);
cmd.body = bodyData;
return cmd;
//根据自定义协议编码消息头、消息体
public ByteBuffer encode()
// 1> header length size
int length = 4;
// 2> header data length
byte[] headerData = this.headerEncode();
length += headerData.length;
// 3> body data length
if (this.body != null)
length += body.length;
ByteBuffer result = ByteBuffer.allocate(4 + length);
// length
result.putInt(length);
// header length
result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
// header data
result.put(headerData);
// body data;
if (this.body != null)
result.put(this.body);
result.flip();
return result;
以上是关于Netty协议-Rocket MQ之NettyRemotingClient/Server的主要内容,如果未能解决你的问题,请参考以下文章