RocketMQ源码—Producer发送消息源码—单向同步异步发送消息一万字
Posted 刘Java
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ源码—Producer发送消息源码—单向同步异步发送消息一万字相关的知识,希望对你有一定的参考价值。
基于RocketMQ release-4.9.3,深入的介绍了Producer发送单向、同步、异步消息的源码。
文章目录
此前,我们学习了RocketMQ源码(7)—Producer发送消息源码(1)—发送消息的总体流程【一万字】,我们知道了Producer发送消息的总体流程,现在我们专门来学习一个重要的发送消息的内部方法MQClientAPIImpl#sendMessage的源码。
当一切准备就绪,最终异步、单向、同步发送模式都会调用MQClientAPIImpl#sendMessage方法发送消息,该方法是真正的发起请求的方法。
这个方法的逻辑就比较简单了:
- 首先构建发送消息命令对象RemotingCommand,此时会判断是否需要更换轻量级消息头,如果sendSmartMsg属性为true(默认为true)或者是批量消息,则使用轻量级消息头。SendMessageRequestHeaderV2相比于requestHeader,其field 全为 a,b,c,d 等短变量名,可以加快FastJson反序列化过程。
- 根据发送模式执行不同的发送逻辑。单向发送模式调用RemotingClient#invokeOneway方法;异步发送模式调用MQClientAPIImpl#sendMessageAsync方法;同步发送模式调用MQClientAPIImpl#sendMessageSync方法。在异步和同步模式发送方法的调用前还会再检查是否超时,如果超时则不再调用。
/**
* MQClientAPIImpl的方法
* 同步、异步、单向消息的最终发送消息的方法
*
* @param addr brokerAddr
* @param brokerName brokerName
* @param msg msg
* @param requestHeader requestHeader
* @param timeoutMillis 剩余超时时间
* @param communicationMode 发送模式
* @param sendCallback 发送回调函数
* @param topicPublishInfo topic信息
* @param instance MQClientInstance
* @param retryTimesWhenSendFailed 异步发送失败时的重试次数,默认2
* @param context 发送消息上下文
* @param producer DefaultMQProducerImpl
* @return
* @throws RemotingException
* @throws MQBrokerException
* @throws InterruptedException
*/
public SendResult sendMessage(
final String addr,
final String brokerName,
final Message msg,
final SendMessageRequestHeader requestHeader,
final long timeoutMillis,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final MQClientInstance instance,
final int retryTimesWhenSendFailed,
final SendMessageContext context,
final DefaultMQProducerImpl producer
) throws RemotingException, MQBrokerException, InterruptedException
long beginStartTime = System.currentTimeMillis();
RemotingCommand request = null;
//消息类型
String msgType = msg.getProperty(MessageConst.PROPERTY_MESSAGE_TYPE);
//是否时重试消息
boolean isReply = msgType != null && msgType.equals(MixAll.REPLY_MESSAGE_FLAG);
/*
* 1 构建发送消息命令对象RemotingCommand,更换轻量级消息头
*
* sendSmartMsg表示是否使用更轻量级的消息头SendMessageRequestHeaderV2
* 相比于requestHeader,其field 全为 a,b,c,d 等短变量名,可以加快FastJson反序列化过程。
*/
if (isReply)
//sendSmartMsg默认为true
if (sendSmartMsg)
//构建轻量级消息头
SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
//创建发送消息命令对象,RequestCode为SEND_REPLY_MESSAGE_V2
request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE_V2, requestHeaderV2);
else
//创建发送消息命令对象,RequestCode为SEND_REPLY_MESSAGE
request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE, requestHeader);
else
if (sendSmartMsg || msg instanceof MessageBatch)
//构建轻量级消息头
SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
//如果是批量消息,则RequestCode为SEND_BATCH_MESSAGE,否则RequestCode为SEND_MESSAGE_V2
request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
else
//创建发送消息命令对象,RequestCode为SEND_MESSAGE
request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
//设置body
request.setBody(msg.getBody());
/*
* 2 根据发送模式执行不同的发送逻辑
*/
switch (communicationMode)
case ONEWAY:
/*
* 单向发送,调用该方法之后不接收返回值,直接返回null
*/
this.remotingClient.invokeOneway(addr, request, timeoutMillis);
return null;
case ASYNC:
//异步消息重试计数器
final AtomicInteger times = new AtomicInteger();
//计算是否超时,如果超时则不再发送
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTimeAsync)
throw new RemotingTooMuchRequestException("sendMessage call timeout");
/*
* 异步发送,调用该方法之后不接收返回值,直接返回null
*/
this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, context, producer);
return null;
case SYNC:
//计算是否超时,如果超时则不再发送
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTimeSync)
throw new RemotingTooMuchRequestException("sendMessage call timeout");
/*
* 同步发送,调用该方法之后阻塞直至收到返回值
*/
return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
default:
assert false;
break;
return null;
1 invokeOneway单向发送
该模式下,消息只会发送一次,且不会返回任何结果,即只管发送不管结果。
/**
* NettyRemotingClient的方法
* <p>
* 单向消息发送的通用方法
*
* @param addr 服务器地址
* @param request 请求命令对象
* @param timeoutMillis 超时时间
*/
@Override
public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException,
RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException
//获取或者建立同服务器的通道,即连接
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive())
try
//执行rpcHook的前置方法doBeforeRequest
doBeforeRpcHooks(addr, request);
/*
* 调用另一个invokeOnewayImpl方法,发送单向消息
*/
this.invokeOnewayImpl(channel, request, timeoutMillis);
catch (RemotingSendRequestException e)
log.warn("invokeOneway: send request exception, so close the channel[]", addr);
this.closeChannel(addr, channel);
throw e;
else
this.closeChannel(addr, channel);
throw new RemotingConnectException(addr);
首先会调用getAndCreateChannel方法,尝试获取此broker的通道,如果没有获取到,那么会创建一个通道,即连接。然后执行rpcHook的前置方法doBeforeRequest,随后调用另一个invokeOnewayImpl方法,发送单向消息。
1.1 invokeOnewayImpl单向调用
这个方法是单向消息的真正发送方法,终于到了最底层了,不容易啊。
该方法首先将请求标记为单向发送,然后基于Semaphore信号量尝试获取单向发送的资源,通过信号量控制单向消息并发发送的消息数,从而保护系统内存占用。客户端单向发送的Semaphore信号量默认为65535,即单向消息最大并发为65535,可通过配置"com.rocketmq.remoting.clientOnewaySemaphoreValue"系统变量更改。
获取到了信号量资源之后。构建SemaphoreReleaseOnlyOnce对象,保证信号量本次只被释放一次,防止并发操作引起线程安全问题,然后就通过channel发送请求即可。
在其监听器ChannelFutureListener中,会释放信号量,如果发送失败了,仅仅是打印一行warn日志,然后就不管了。如果没有获取到信号量资源,那么直接抛出异常即可,并且不再发送。
只管发送不管结果,不会进行任何重试,这就是单向发送消息的真正含义。
/**
* NettyRemotingAbstract的方法
* <p>
* 单向消息发送的逻辑
*
* @param channel 通道
* @param request 请求
* @param timeoutMillis 超时时间
*/
public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException
//标记为单向发送
request.markOnewayRPC();
//基于Semaphore信号量尝试获取单向发送的资源,通过信号量控制单向消息并发发送的消息数,从而保护系统内存占用。
//客户端单向发送的Semaphore信号量默认为65535,可通过配置"com.rocketmq.remoting.clientOnewaySemaphoreValue"系统变量更改
boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
//如果获取到了信号量资源
if (acquired)
//构建SemaphoreReleaseOnlyOnce对象,保证信号量本次只被释放一次,防止并发操作引起线程安全问题
final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);
try
//将请求发送出去即可
channel.writeAndFlush(request).addListener(new ChannelFutureListener()
@Override
public void operationComplete(ChannelFuture f) throws Exception
//释放信号量
once.release();
//如果发送失败了,仅仅是打印一行warn日志,然后就不管了,这就是单向发送
if (!f.isSuccess())
log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed.");
);
catch (Exception e)
//释放信号量
once.release();
log.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed.");
throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
else
//如果没有获取到信号量资源,那么直接抛出异常即可,并且不再发送
if (timeoutMillis <= 0)
throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast");
else
String info = String.format(
"invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreOnewayValue: %d",
timeoutMillis,
this.semaphoreOneway.getQueueLength(),
this.semaphoreOneway.availablePermits()
);
log.warn(info);
throw new RemotingTimeoutException(info);
2 sendMessageSync同步发送
该模式下,发送之后会同步阻塞直到结果返回。
/**
* MQClientAPIImpl的方法
*/
private SendResult sendMessageSync(
final String addr,
final String brokerName,
final Message msg,
final long timeoutMillis,
final RemotingCommand request
) throws RemotingException, MQBrokerException, InterruptedException
/*
* 发送同步消息
*/
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
/*
* 处理响应结果
*/
return this.processSendResponse(brokerName, msg, response, addr);
其内部调用NettyRemotingClient#invokeSync方法执行同步调用,然后调用processSendResponse方法处理响应结果。
2.1 invokeSync同步调用
该方法执行同步调用。首先获取或者创建通道,即连接。然后在发送消息前后执行rpcHook钩子方法,即RPCHook#doBeforeRequest方法,通过调用invokeSyncImpl方法发起同步调用并获取响应结果返回。
/**
* NettyRemotingClient的方法
* <p>
* 同步调用
*/
@Override
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException
long beginStartTime = System.currentTimeMillis();
//根据addr建立连接,获取channel
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive())
try
//执行rpc钩子的doBeforeRequest方法
doBeforeRpcHooks(addr, request);
//检查超时,如果超时则抛出异常
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime)
throw new RemotingTimeoutException("invokeSync call the addr[" + addr + "] timeout");
//执行同步远程调用,或者调用结果
RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
//执行rpc钩子的doAfterResponse方法
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
return response;
catch (RemotingSendRequestException e)
log.warn("invokeSync: send request exception, so close the channel[]", addr);
this.closeChannel(addr, channel);
throw e;
catch (RemotingTimeoutException e)
if (nettyClientConfig.isClientCloseSocketIfTimeout())
this.closeChannel(addr, channel);
log.warn("invokeSync: close socket because of timeout, ms, ", timeoutMillis, addr);
log.warn("invokeSync: wait response timeout exception, the channel[]", addr);
throw e;
else
this.closeChannel(addr, channel);
throw new RemotingConnectException(addr);
2.1.1 invokeSyncImpl同步调用实现
invokeSyncImpl方法发起同步调用并获取响应结果。
- 首先创建一个ResponseFuture,然后将本次请求id和respone存入responseTable缓存。
- 随后执行调用,并添加一个ChannelFutureListener,消息发送完毕会进行回调。然后responseFuture通过waitResponse方法阻塞当前线程,直到得到响应结果或者到达超时时间。
- 当ChannelFutureListener回调的时候会判断如果消息发送成功,那么设置发送成功并返回,否则设置发送失败标志和失败原因,并且设置响应结果为null,唤醒阻塞的responseFuture。
- responseFuture被唤醒后会进行一系列判断。如果响应结果为null,那么会根据不同情况抛出不同的异常,如果响应结果不为null,那么返回响应结果。
- 最后在finaly块中从responseTable中移除响应结果缓存。
/**
* NettyRemotingAbstract的方法
* <p>
* 执行同步调用
*/
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
final long timeoutMillis)
throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException
//获取请求id,通过id可以获取请求结果
final int opaque = request.getOpaque();
try
//创建一个Future的map成员ResponseFuture
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
//将请求id和responseFuture存入responseTable缓存中
this.responseTable.put(opaque, responseFuture);
final SocketAddress addr = channel.remoteAddress();
//发送请求,添加一个ChannelFutureListener,消息发送完毕会进行回调
channel.writeAndFlush(request).addListener(new ChannelFutureListener()
@Override
public void operationComplete(ChannelFuture f) throws Exception
//如果消息发送成功,那么设置responseFuture发送成功并返回
if (f.isSuccess())
responseFuture.setSendRequestOK(true);
return;
else
responseFuture.setSendRequestOK(false);
//如果发送失败,那么从responseTable移除该缓存
responseTable.remove(opaque);
//设置失败原因
responseFuture.setCause(f.cause());
//设置响应结果为null,唤醒阻塞的responseFuture
//其内部调用了countDownLatch.countDown()方法
responseFuture.putResponse(null);
log.warn("send a request command to channel <" + addr + "> failed.");
);
/*
* responseFuture同步阻塞等待直到得到响应结果或者到达超时时间
* 其内部调用了countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS)方法
*/
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
//如果响应结果为null
if (null == responseCommand)
//如果是发送成功,但是没有响应,表示等待响应超时,那么抛出超时异常
if (responseFuture.isSendRequestOK())
throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
responseFuture.getCause());
else
//如果是发送失败,抛出发送失败异常
throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
//否则返回响应结果
return responseCommand;
finally
//最后从responseTable中移除响应结果缓存
this.responseTable.remove(opaque);
2.1.1.1 请求的阻塞和唤醒
从上面的源码可以得知,同步发送消息的请求可能会经历短暂的阻塞状态。responseFuture通过waitResponse方法阻塞当前线程,直到得到响应结果或者到达超时时间。进入该方法,可以发现其阻塞使用的工具其实就是CountDownLatch。
/**
* ResponseFuture的方法
* <p>
* 同步等待响应结果
*
* @param timeoutMillis 超时时间
*/
public RemotingCommand waitResponse(final RocketMQ源码—Producer发送单向同步异步消息源码一万字