MQTT---HiveMQ源码详解(十三)Netty-MQTT消息事件处理(源码举例解读)
Posted 西安-PP
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MQTT---HiveMQ源码详解(十三)Netty-MQTT消息事件处理(源码举例解读)相关的知识,希望对你有一定的参考价值。
前言
由于上一篇讲的都是大致的流程,所以这一篇我们抽取流程中的一步,给大家介绍Authentication部分的源码,让大家对上一节的理解更深。
MqttConnectHandler
MqttConnectHandler是SimpleChannelInboundHandler的子类
channelRead0
@Override
protected void channelRead0(ChannelHandlerContext ctx, Connect msg) throws Exception {
try {
//加入MqttDisallowSecondConnect
ctx.pipeline().addAfter(Pipelines.MQTT_MESSAGE_DECODER, Pipelines.MQTT_DISALLOW_SECOND_CONNECT, this.disallowSecondConnect);
} catch (IllegalArgumentException e) {
ctx.pipeline().firstContext().fireChannelRead(msg);
return;
}
//校验clientid
if (!validIdentifier(ctx, msg)) {
return;
}
//标志是否接管
ctx.channel().attr(AttributeKeys.MQTT_TAKEN_OVER).set(false);
//删除连接成功,未发connect消息超时handler
removeConnectIdleHandler(ctx);
//进入插件认证阶段
pluginOnAuthentication(ctx, msg);
}
pluginOnAuthentication
private void pluginOnAuthentication(ChannelHandlerContext ctx, Connect connect) {
//获得clienttoken,ClientToken是ClientCredentials实现类
ClientToken clientToken = ChannelUtils.clientToken(ctx.channel());
//判断callbackRegistry中是否存在可用的OnAuthenticationCallback
if (this.callbackRegistry.isAvailable(OnAuthenticationCallback.class)) {
//添加PluginOnAuthenticationCallbackHandler,
ctx.pipeline().addLast(Pipelines.PLUGIN_ON_AUTHENTICATION_CALLBACK_HANDLER, this.pluginOnAuthenticationCallbackHandlerProvider.get());
//触发PluginOnAuthentication事件
ctx.fireUserEventTriggered(new PluginOnAuthentication(connect, clientToken));
} else {
//如果没有可用OnAuthenticationCallback,那么认为是不需要做Authentication,就去处理LWT,因为当client掉线后,会触发发送遗言,所以需要先判断对与该遗言发布到topic是否具备权限
pluginOnAuthorizationLWT(ctx, null, connect, clientToken, ReturnCode.ACCEPTED, true);
}
}
userEventTriggered
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
//当插件认证完成
if (evt instanceof PluginOnAuthenticationCompleted) {
//进行认证完成后的处理
pluginOnAuthenticationCompleted(ctx, (PluginOnAuthenticationCompleted) evt);
} else if (evt instanceof PluginRestrictionsAfterLoginCompleted) {
pluginRestrictionsAfterLoginCompleted(ctx, (PluginRestrictionsAfterLoginCompleted) evt);
} else if (evt instanceof PluginOnConnectCompleted) {
pluginOnConnectCompleted(ctx, (PluginOnConnectCompleted) evt);
} else if (evt instanceof PluginOnAuthorizationCompleted) {
pluginOnAuthorizationCompleted(ctx, (PluginOnAuthorizationCompleted) evt);
} else if (evt instanceof MqttConnectPersistenceHandler.OnConnectPersistenceCompleted) {
MqttConnectPersistenceHandler.OnConnectPersistenceCompleted event = (MqttConnectPersistenceHandler.OnConnectPersistenceCompleted) evt;
onConnectPersistenceCompleted(ctx, event.getConnect(), event.isSessionPresent());
} else {
super.userEventTriggered(ctx, evt);
}
}
pluginOnAuthenticationCompleted
private void pluginOnAuthenticationCompleted(ChannelHandlerContext ctx,
PluginOnAuthenticationCompleted event) {
//获得处理完成的ReturnCode
ReturnCode returnCode = event.getReturnCode();
boolean accepted = returnCode == ReturnCode.ACCEPTED;
//处理LWT
pluginOnAuthorizationLWT(ctx, event.getException(), event.getConnect(),
event.getClientCredentials(), returnCode, accepted);
}
PluginOnAuthenticationCallbackHandler
@Singleton
@ChannelHandler.Sharable
public class PluginOnAuthenticationCallbackHandler extends ChannelInboundHandlerAdapter {
private static final Logger LOGGER = LoggerFactory.getLogger(PluginOnAuthenticationCallbackHandler.class);
private final CallbackRegistry callbackRegistry;
private final HiveMQConfigurationService hiveMQConfigurationService;
private final Metrics metrics;
private final CallbackExecutor callbackExecutor;
@Inject
public PluginOnAuthenticationCallbackHandler(CallbackRegistry callbackRegistry,
HiveMQConfigurationService hiveMQConfigurationService,
Metrics metrics,
CallbackExecutor callbackExecutor) {
this.callbackRegistry = callbackRegistry;
this.hiveMQConfigurationService = hiveMQConfigurationService;
this.metrics = metrics;
this.callbackExecutor = callbackExecutor;
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
//当需要插件认证
if (evt instanceof PluginOnAuthentication) {
//进行认证
onAuthentication(ctx, (PluginOnAuthentication) evt);
//当单一一个插件认证完成
} else if (evt instanceof PluginOnAuthenticationCallbackCompleted) {
//对一个插件认证完成的结果进行处理
onAuthenticationCallbackCompleted(ctx, (PluginOnAuthenticationCallbackCompleted) evt);
} else {
super.userEventTriggered(ctx, evt);
}
}
private void onAuthentication(ChannelHandlerContext ctx, PluginOnAuthentication event) {
//判断是否存在可用OnAuthenticationCallback.class,这里再判断一次原因是因为有时差。
boolean available = this.callbackRegistry.isAvailable(OnAuthenticationCallback.class);
if (available) {
//获得所有已注册的OnAuthenticationCallback,并构建新的队列
Deque<OnAuthenticationCallback> leftCallbacks = new ArrayDeque(this.callbackRegistry.getCallbacks(OnAuthenticationCallback.class));
//获得认证证书
ClientCredentials clientCredentials = event.getClientCredentials();
//获得callback数量,并作为期待的返回结果数量,以后面处理完成作为一个判断条件
int expectedResultCount = leftCallbacks.size();
//poll一个OnAuthenticationCallback,进行认证
OnAuthenticationCallback callback = leftCallbacks.poll();
//构建存储结果的list
List<PluginOnAuthenticationResult> results = new ArrayList(leftCallbacks.size());
//提交认证task
submitTask(ctx, callback, clientCredentials, event.getConnect(), leftCallbacks, results, expectedResultCount);
//如果用户配置需要所有插件都必须全部认证通过,才认为通过认证,并发布认证完成事件事件
} else if (needAllPluginsToReturnTrue()) {
ctx.pipeline().fireUserEventTriggered(new PluginOnAuthenticationCompleted(
event.getConnect(), event.getClientCredentials(), ReturnCode.REFUSED_NOT_AUTHORIZED, new AuthenticationException("No OnAuthenticationCallback available", ReturnCode.REFUSED_NOT_AUTHORIZED)));
} else {
//否则,认为认证通过,并发布认证完成事件
ctx.pipeline().fireUserEventTriggered(new PluginOnAuthenticationCompleted(
event.getConnect(), event.getClientCredentials(), ReturnCode.ACCEPTED));
}
}
//当一个callback认证完成
private void onAuthenticationCallbackCompleted(ChannelHandlerContext ctx,
PluginOnAuthenticationCallbackCompleted event) {
//获得事件结果
List<PluginOnAuthenticationResult> results = event.getResults();
//获得最后一个result
PluginOnAuthenticationResult lastResult = results.get(results.size() - 1);
Connect connect = event.getConnect();
ClientCredentials clientCredentials = event.getClientCredentials();
//判断是否可以提前结束,也就是可以确定的到可以返回client端ConnAck
if (lastResult.isRefused() ||
lastResult.isAuthenticated() && !needAllPluginsToReturnTrue() ||
!lastResult.isAuthenticated() && needAllPluginsToReturnTrue()) {
//触发认证完成事件
ctx.pipeline().fireUserEventTriggered(new PluginOnAuthenticationCompleted(
connect, clientCredentials, lastResult.getReturnCode(), lastResult.getException()));
//判断当前pipeline中是否存在当前handler,并移除
if (ctx.pipeline().get(getClass()) != null) {
ctx.pipeline().remove(this);
}
return;
}
//如果所有插件认证都完成了
if (results.size() == event.getExpectedResultCount()) {
//如果认证通过
if (accepted(results)) {
ctx.pipeline().fireUserEventTriggered(new PluginOnAuthenticationCompleted(
connect, clientCredentials, ReturnCode.ACCEPTED));
//否则认证失败
} else {
ctx.pipeline().fireUserEventTriggered(new PluginOnAuthenticationCompleted(
connect, clientCredentials, ReturnCode.REFUSED_NOT_AUTHORIZED));
}
//判断并移除
if (ctx.pipeline().get(getClass()) != null) {
ctx.pipeline().remove(this);
}
return;
}
//如果还有插件未完成认证
Queue<OnAuthenticationCallback> leftCallbacks = event.getLeftCallbacks();
//poll一个OnAuthenticationCallback
OnAuthenticationCallback callback = leftCallbacks.poll();
//继续提交认证task
submitTask(ctx, callback, clientCredentials, connect,
leftCallbacks, results, event.getExpectedResultCount());
}
private void submitTask(ChannelHandlerContext ctx,
OnAuthenticationCallback callback,
ClientCredentials clientCredentials,
Connect connect,
Queue<OnAuthenticationCallback> leftCallbacks,
List<PluginOnAuthenticationResult> results,
int expectedResultCount) {
//获得到Future
ListenableFuture future = this.callbackExecutor.submit(createTask(callback, clientCredentials));
//创建获得结果的callback
ResultCallback resultCallback = createResultCallback(ctx, clientCredentials, connect, leftCallbacks, results, expectedResultCount);
//同步获得结果
Futures.addCallback(future, resultCallback, ctx.executor().parent());
}
//创建认证task
@NotNull
@VisibleForTesting
Task createTask(OnAuthenticationCallback callback,
ClientCredentials clientCredentials) {
return new Task(callback, clientCredentials, this.metrics);
}
//创建获得认证结果FutureCallback
@NotNull
@VisibleForTesting
ResultCallback createResultCallback(ChannelHandlerContext ctx,
ClientCredentials clientCredentials,
Connect connect,
Queue<OnAuthenticationCallback> leftCallbacks,
List<PluginOnAuthenticationResult> results,
int expectedResultCount) {
return new ResultCallback(ctx, clientCredentials, connect,
leftCallbacks, results, expectedResultCount);
}
//获得用户插件认证配置
private boolean needAllPluginsToReturnTrue() {
return this.hiveMQConfigurationService.internalConfiguration()
.getBoolean(Internals.PLUGIN_AUTHENTICATION_NEED_ALL_PLUGINS_TO_RETURN_TRUE);
}
//判断是否认证通过
private boolean accepted(List<PluginOnAuthenticationResult> results) {
boolean needAllPluginsToReturnTrue = needAllPluginsToReturnTrue();
for (PluginOnAuthenticationResult result : results) {
if (!needAllPluginsToReturnTrue && result.isAuthenticated()) {
return true;
}
if (needAllPluginsToReturnTrue && !result.isAuthenticated()) {
return false;
}
}
return needAllPluginsToReturnTrue;
}
//同步获得结果的FutureCallback
@VisibleForTesting
static class ResultCallback implements FutureCallback<PluginOnAuthenticationResult> {
private final ChannelHandlerContext ctx;
private final ClientCredentials clientCredentials;
private final Connect connect;
private final Queue<OnAuthenticationCallback> leftCallbacks;
private final List<PluginOnAuthenticationResult> results;
private final int expectedResultCount;
public ResultCallback(ChannelHandlerContext ctx,
ClientCredentials clientCredentials,
Connect connect,
Queue<OnAuthenticationCallback> leftCallbacks,
List<PluginOnAuthenticationResult> results,
int expectedResultCount) {
this.ctx = ctx;
this.clientCredentials = clientCredentials;
this.connect = connect;
this.leftCallbacks = leftCallbacks;
this.results = results;
this.expectedResultCount = expectedResultCount;
}
//没有异常,回调,并触发一个插件认证完成事件
@Override
public void onSuccess(@Nullable PluginOnAuthenticationResult result) {
this.results.add(result);
this.ctx.pipeline().fireUserEventTriggered(
new PluginOnAuthenticationCallbackCompleted(this.leftCallbacks, this.results, this.connect,
this.clientCredentials, this.expectedResultCount));
}
//有异常,回调,并触发一个插件认证完成事件
public void onFailure(Throwable t) {
LOGGER.error("OnAuthenticationCallback failed. Skipping all other handlers");
this.results.add(new PluginOnAuthenticationResult(false, ReturnCode.REFUSED_NOT_AUTHORIZED, true,
new AuthenticationException(t.getMessage() + " See log for more information", ReturnCode.REFUSED_NOT_AUTHORIZED)));
this.ctx.pipeline().fireUserEventTriggered(
new PluginOnAuthenticationCallbackCompleted(this.leftCallbacks, this.results,
this.connect, this.clientCredentials, this.expectedResultCount));
}
}
//认证task
@VisibleForTesting
static class Task implements CallableTask<PluginOnAuthenticationResult> {
private final OnAuthenticationCallback callback;
private final ClientCredentials clientCredentials;
private final Metrics metrics;
public Task(@NotNull OnAuthenticationCallback callback,
ClientCredentials clientCredentials,
Metrics metrics) {
this.callback = callback;
this.clientCredentials = clientCredentials;
this.metrics = metrics;
}
@Override
public PluginOnAuthenticationResult call() throws Exception {
//获得埋点该插件执行时间的上下文
Timer.Context timer = this.metrics.pluginTimerAuthentication().time();
try {
//调用callback,去认证
Boolean authenticated = this.callback.checkCredentials(this.clientCredentials);
//构建认证结果
PluginOnAuthenticationResult result = new PluginOnAuthenticationResult(authenticated, authenticated ? ReturnCode.ACCEPTED : ReturnCode.REFUSED_NOT_AUTHORIZED, false);
//停止插件执行时间计时
timer.stop();
return result;
} catch (AuthenticationException e) {
//当插件抛出认证失败exception
LOGGER.debug("An exception was raised when calling the OnAuthenticationCallback {}:", this.callback.getClass(), e);
//构建认证结果
PluginOnAuthenticationResult result = new PluginOnAuthenticationResult(false, e.getReturnCode(), true, e);
//停止计时
timer.stop();
return result;
} catch (Throwable t) {
//当插件抛出其他Throwable
LOGGER.error("Unhandled Exception in OnAuthenticationCallback {}. Skipping all other handlers", this.callback.getClass());
//停止计时
timer.stop();
//插件异常的处理器,去记录日志
PluginExceptionUtils.log(t);
//构建认证结果
return new PluginOnAuthenticationResult(false, ReturnCode.REFUSED_NOT_AUTHORIZED, true,
new AuthenticationException(t.getMessage() + " See log for more information", ReturnCode.REFUSED_NOT_AUTHORIZED));
}
}
@NotNull
public Class callbackType() {
return this.callback.getClass();
}
}
}
其他事件类
这几个类都是简单的pojo,在前面的源码注释中都已经描述过,所以就不写注释了。
public class PluginOnAuthentication {
private final Connect connect;
private final ClientCredentials clientCredentials;
public PluginOnAuthentication(Connect connect, ClientCredentials clientCredentials) {
this.connect = connect;
this.clientCredentials = clientCredentials;
}
public Connect getConnect() {
return connect;
}
public ClientCredentials getClientCredentials() {
return clientCredentials;
}
}
public class PluginOnAuthenticationCallbackCompleted {
private final Queue<OnAuthenticationCallback> leftCallbacks;
private final int expectedResultCount;
private final List<PluginOnAuthenticationResult> results;
private final Connect connect;
private final ClientCredentials clientCredentials;
public PluginOnAuthenticationCallbackCompleted(Queue<OnAuthenticationCallback> leftCallbacks,
List<PluginOnAuthenticationResult> results,
Connect connect,
ClientCredentials clientCredentials,
int expectedResultCount) {
this.leftCallbacks = leftCallbacks;
this.results = results;
this.connect = connect;
this.clientCredentials = clientCredentials;
this.expectedResultCount = expectedResultCount;
}
public Queue<OnAuthenticationCallback> getLeftCallbacks() {
return leftCallbacks;
}
public int getExpectedResultCount() {
return expectedResultCount;
}
public List<PluginOnAuthenticationResult> getResults() {
return results;
}
public Connect getConnect() {
return connect;
}
public ClientCredentials getClientCredentials() {
return clientCredentials;
}
}
public class PluginOnAuthenticationCompleted {
private final Connect connect;
private final ClientCredentials clientCredentials;
private final ReturnCode returnCode;
private final AuthenticationException exception;
public PluginOnAuthenticationCompleted(Connect connect,
ClientCredentials clientCredentials,
ReturnCode returnCode) {
this.connect = connect;
this.clientCredentials = clientCredentials;
this.returnCode = returnCode;
this.exception = null;
}
public PluginOnAuthenticationCompleted(Connect connect,
ClientCredentials clientCredentials,
ReturnCode returnCode,
AuthenticationException exception) {
this.connect = connect;
this.clientCredentials = clientCredentials;
this.returnCode = returnCode;
this.exception = exception;
}
public Connect getConnect() {
return connect;
}
public ClientCredentials getClientCredentials() {
return clientCredentials;
}
public ReturnCode getReturnCode() {
return returnCode;
}
public AuthenticationException getException() {
return exception;
}
}
MQTT交流群:221405150
以上是关于MQTT---HiveMQ源码详解(十三)Netty-MQTT消息事件处理(源码举例解读)的主要内容,如果未能解决你的问题,请参考以下文章
MQTT---HiveMQ源码详解(十四)Persistence-LocalPersistence