mqtt协议-broker之moqutte源码研究五之UNSUBSCRIBE与DISCONN报文处理

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了mqtt协议-broker之moqutte源码研究五之UNSUBSCRIBE与DISCONN报文处理相关的知识,希望对你有一定的参考价值。

本文讲解moquette对UNSUBSCRIBE和DISCONNECT的处理

先说UNSUBSCRIBE,代码比较简单

    public void processUnsubscribe(Channel channel, MqttUnsubscribeMessage msg) {
    List<String> topics = msg.payload().topics();
    String clientID = NettyUtils.clientID(channel);

    LOG.info("Processing UNSUBSCRIBE message. CId={}, topics={}", clientID, topics);

    ClientSession clientSession = m_sessionsStore.sessionForClient(clientID);
    for (String t : topics) {
        Topic topic = new Topic(t);
        boolean validTopic = topic.isValid();
        if (!validTopic) {
            // close the connection, not valid topicFilter is a protocol violation
            channel.close();
            LOG.error("Topic filter is not valid. CId={}, topics={}, badTopicFilter={}", clientID, topics, topic);
            return;
        }
        if(LOG.isDebugEnabled()){
            LOG.debug("Removing subscription. CId={}, topic={}", clientID, topic);
        }
        subscriptions.removeSubscription(topic, clientID);
        clientSession.unsubscribeFrom(topic);
        String username = NettyUtils.userName(channel);
        m_interceptor.notifyTopicUnsubscribed(topic.toString(), clientID, username);
    }

    // ack the client
    int messageID = msg.variableHeader().messageId();
    MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, AT_LEAST_ONCE, false, 0);
    MqttUnsubAckMessage ackMessage = new MqttUnsubAckMessage(fixedHeader, from(messageID));

    LOG.info("Sending UNSUBACK message. CId={}, topics={}, messageId={}", clientID, topics, messageID);
    channel.writeAndFlush(ackMessage);
}

主要分为以下几步
1.从目录树下,移除该client的订阅,这个移除过程有点复杂,后面单独一篇专门讲解topic树
2.清除ClientSession里面的订阅,包括Set<Subscription> subscriptions,同时还得移除ISubscriptionsStore里面的Map<Topic, Subscription> subscriptions
3.唤醒拦截器
4.返回UNSUBACK ,这里注意UNSUBACK 是没有payload的。

再说DISCONNECT的处理

public void processDisconnect(Channel channel) throws InterruptedException {
    final String clientID = NettyUtils.clientID(channel);
    LOG.info("Processing DISCONNECT message. CId={}", clientID);
    channel.flush();
    final ConnectionDescriptor existingDescriptor = this.connectionDescriptors.getConnection(clientID);
    if (existingDescriptor == null) {
        // another client with same ID removed the descriptor, we must exit
        channel.close();
        return;
    }

    if (existingDescriptor.doesNotUseChannel(channel)) {
        // another client saved it‘s descriptor, exit
        LOG.warn("Another client is using the connection descriptor. Closing connection. CId={}", clientID);
        existingDescriptor.abort();
        return;
    }

    if (!removeSubscriptions(existingDescriptor, clientID)) {
        LOG.warn("Unable to remove subscriptions. Closing connection. CId={}", clientID);
        existingDescriptor.abort();
        return;
    }

    if (!dropStoredMessages(existingDescriptor, clientID)) {
        LOG.warn("Unable to drop stored messages. Closing connection. CId={}", clientID);
        existingDescriptor.abort();
        return;
    }

    if (!cleanWillMessageAndNotifyInterceptor(existingDescriptor, clientID)) {
        LOG.warn("Unable to drop will message. Closing connection. CId={}", clientID);
        existingDescriptor.abort();
        return;
    }

    if (!existingDescriptor.close()) {
        LOG.info("The connection has been closed. CId={}", clientID);
        return;
    }

    boolean stillPresent = this.connectionDescriptors.removeConnection(existingDescriptor);
    if (!stillPresent) {
        // another descriptor was inserted
        LOG.warn("Another descriptor has been inserted. CId={}", clientID);
        return;
    }

    LOG.info("The DISCONNECT message has been processed. CId={}", clientID);
}

1.检查连接描述符是否还存在,如果不存在,说明之前已经有客户端删除了它,直接关闭通道
2.判断这个client的连接描述符是不是,是不是还是当前使用这个通道的client?作者要先防止这种情况呢?先卖个关子,后面的第6条会说明
3.清除订阅请求,这里面好像只清楚了不要求保存会话信息的clientsession里面的ISessionsStore里面的Map<Topic, Subscription> subscriptions,而并没有清除ClientSession里面的Set<Subscription> subscriptions和topic树里面的订阅,这能够解释http://blog.51cto.com/13579730/2073914 这篇文章结尾讨论的问题了,只有Map<Topic, Subscription> subscriptions的订阅才是最准确的。
4.丢弃存储的消息,这里面也只是会丢弃不要去保存会话信息的消息
5.清除遗愿消息,对于遗愿消息,这里稍微啰嗦一点,遗愿消息是在初次连接的存储到ProtocolProcessor的ConcurrentMap<String, WillMessage> m_willStore这里面的,那么什么时候发送给订阅者呢?看下面

    io.moquette.server.netty.NettyMQTTHandler#channelInactive
    @Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    String clientID = NettyUtils.clientID(ctx.channel());
    if (clientID != null && !clientID.isEmpty()) {
        LOG.info("Notifying connection lost event. MqttClientId = {}.", clientID);
        m_processor.processConnectionLost(clientID, ctx.channel());
    }
    ctx.close();
}
    说明是当netty检测到通道不活跃的时候通知ProtocolProcessor处理ConnectionLost事件的。
    public void processConnectionLost(String clientID, Channel channel) {
    LOG.info("Processing connection lost event. CId={}", clientID);
    ConnectionDescriptor oldConnDescr = new ConnectionDescriptor(clientID, channel, true);
    connectionDescriptors.removeConnection(oldConnDescr);//移除连接描述符
    // publish the Will message (if any) for the clientID
    if (m_willStore.containsKey(clientID)) {
        WillMessage will = m_willStore.get(clientID);
        forwardPublishWill(will, clientID);//发布遗愿消息
        m_willStore.remove(clientID);//移除遗愿消息存储
    }

    String username = NettyUtils.userName(channel);
    m_interceptor.notifyClientConnectionLost(clientID, username);//唤醒拦截器
}
    在以下这种情况下会发布遗愿消息
    遗嘱消息发布的条件,包括但不限于:
    服务端检测到了一个I/O错误或者网络故障。
    客户端在保持连接(Keep Alive)的时间内未能通讯。
    客户端没有先发送DISCONNECT报文直接关闭了网络连接。
    由于协议错误服务端关闭了网络连接。

    另外说明一下,遗愿消息是可以设置消息等级的,而且可以被设置成retain消息

6.连接描述符集合里面清除该通道对应的连接描述符,这里有一点很容易误解,强调一下

    boolean stillPresent = this.connectionDescriptors.removeConnection(existingDescriptor);
    if (!stillPresent) {
        // another descriptor was inserted
        LOG.warn("Another descriptor has been inserted. CId={}", clientID);
        return;
    }

    作者调用的是ConcurrentMap里面的boolean remove(Object key, Object value);这个方法要求key存在,且value 与预期的一样才会删除,也就说,是有可能存在的,key一样而value不一样的情况的,什么时候会出现?答案是client在两个设备上先后登陆,这个时候由于是存在一个map里面的所以后面的登陆所创建的连接描述符会覆盖前面的一个。当然这里面,也可以在覆盖之前强制断开之前那个连接,但是moquette并没有这么做,具体看源码io.moquette.server.ConnectionDescriptorStore#addConnection

也就说说moquette是允许存在一个账号多设备登陆的。将入client先后在A,B两个设备上建立连接,B连接会覆盖A连接,这个时候A连接虽然还在,但其实是永远也收不到消息的,因为发送消息的时候,会以ConnectionDescriptorStore里面存储的为准,具体看源码
io.moquette.server.ConnectionDescriptorStore#sendMessage,也就是说A连接会无谓的占用broker的资源,个人觉得这样并不好,也非常没有必要,大家可以自行改进。
现在大家就能够理解上面的第2步了,因为这个就是为双登陆的情况下,被覆盖的那个连接准备的。

以上是关于mqtt协议-broker之moqutte源码研究五之UNSUBSCRIBE与DISCONN报文处理的主要内容,如果未能解决你的问题,请参考以下文章

mqtt协议-broker之moqutte源码研究二之SUBSCRIBE报文处理

mqtt协议-broker之moqutte源码研究六之集群

mqtt协议-broker之moqutte源码研究二之Connect报文处理

mqtt协议-broker之moqutte源码研究五之UNSUBSCRIBE与DISCONN报文处理

MQTT 系列之 MQTT broker 的连接

物联网之MQTT3.1.1和MQTT5协议 (20) MQTT 相关开源实现Server或Broker列表(会不断更新)