MQTT Implementation

Posted iiidragon

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MQTT Implementation相关的知识,希望对你有一定的参考价值。

1. MQTT Message

技术图片 

  技术图片 

public class MqttMessage{
    private final MqttFixedHeader mqttFixedHeader;
    private final Object variableHeader;
    private final Object payload;
}

public final class MqttFixedHeader {
    private final MqttMessageType messageType;
    private final boolean isDup;
    private final MqttQoS qosLevel;
    private final boolean isRetain;
    private final int remainingLength;
}

public enum MqttMessageType {
    CONNECT(1),
    CONNACK(2),
    PUBLISH(3),
    PUBACK(4),
    PUBREC(5),
    PUBREL(6),
    PUBCOMP(7),
    SUBSCRIBE(8),
    SUBACK(9),
    UNSUBSCRIBE(10),
    UNSUBACK(11),
    PINGREQ(12),
    PINGRESP(13),
    DISCONNECT(14);
}

2. Decode MQTT FixedHeader

技术图片

技术图片

  技术图片

技术图片

 技术图片

Max of remaining length is 268435455 =   255 M

private static MqttFixedHeader decodeFixedHeader(ByteBuf buffer) {
    //read 1 byte
    short b1 = buffer.readUnsignedByte();
    
    //mqtt type 
    MqttMessageType messageType = MqttMessageType.valueOf(b1 >> 4);
    
    //dup flag if PUBLISH TYPE
    boolean dupFlag = (b1 & 0x08) == 0x08;
    
    //QoS if PUBLISH TYPE
    int qosLevel = (b1 & 0x06) >> 1;
    
    //RETAIN if PUBLISH TYPE
    boolean retain = (b1 & 0x01) != 0;

    //read length of payload
    int remainingLength = 0;
    int multiplier = 1;
    short digit;
    int loops = 0;
    do {
        //read 1 byte
        digit = buffer.readUnsignedByte();
        
        // 127 = 0x7F
        remainingLength += (digit & 127) * multiplier;
        
        //128 = 0x80
        multiplier *= 128;
        
        //max bytes is 4 
        loops++;
    } while ((digit & 128) != 0 && loops < 4);

    // MQTT protocol limits Remaining Length to 4 bytes
    if (loops == 4 && (digit & 128) != 0) {
        throw new DecoderException("remaining length exceeds 4 digits (" + messageType + ‘)‘);
    }
    
    //construct MqttFixedHeader
    MqttFixedHeader decodedFixedHeader = new MqttFixedHeader(messageType, dupFlag, MqttQoS.valueOf(qosLevel), retain, remainingLength);
    return validateFixedHeader(resetUnusedFields(decodedFixedHeader));
}

 

3. Decode MQTT VariableHeader

技术图片  

private static Result<Integer> decodeMsbLsb(ByteBuf buffer, int min, int max) {
    short msbSize = buffer.readUnsignedByte();
    short lsbSize = buffer.readUnsignedByte();
    final int numberOfBytesConsumed = 2;
    int result = msbSize << 8 | lsbSize;
    if (result < min || result > max) {
        result = -1;
    }
    return new Result<Integer>(result, numberOfBytesConsumed);

 

private static Result<?> decodeVariableHeader(ByteBuf buffer, MqttFixedHeader mqttFixedHeader) {
    switch (mqttFixedHeader.messageType()) {
        case CONNECT:
            return decodeConnectionVariableHeader(buffer);
        case CONNACK:
            return decodeConnAckVariableHeader(buffer);
        case SUBSCRIBE:
        case UNSUBSCRIBE:
        case SUBACK:
        case UNSUBACK:
        case PUBACK:
        case PUBREC:
        case PUBCOMP:
        case PUBREL:
            return decodeMessageIdVariableHeader(buffer);
        case PUBLISH:
            return decodePublishVariableHeader(buffer, mqttFixedHeader);
        case PINGREQ:
        case PINGRESP:
        case DISCONNECT:
            // Empty variable header
            return new Result<Object>(null, 0);
    }
    //should never reach here
    return new Result<Object>(null, 0); 
}

3.1 If Connect Type

技术图片 

private static Result<Integer> decodeMsbLsb(ByteBuf buffer, int min, int max) {
    short msbSize = buffer.readUnsignedByte();
    short lsbSize = buffer.readUnsignedByte();
    final int numberOfBytesConsumed = 2;
    int result = msbSize << 8 | lsbSize;
    if (result < min || result > max) {
        result = -1;
    }
    return new Result<Integer>(result, numberOfBytesConsumed);
}

 

private static Result<String> decodeString(ByteBuf buffer, int minBytes, int maxBytes) {
    final Result<Integer> decodedSize = decodeMsbLsb(buffer);
    int size = decodedSize.value;
    int numberOfBytesConsumed = decodedSize.numberOfBytesConsumed;
    if (size < minBytes || size > maxBytes) {
        buffer.skipBytes(size);
        numberOfBytesConsumed += size;
        return new Result<String>(null, numberOfBytesConsumed);
    }
    String s = buffer.toString(buffer.readerIndex(), size, CharsetUtil.UTF_8);
    buffer.skipBytes(size);
    numberOfBytesConsumed += size;
    return new Result<String>(s, numberOfBytesConsumed);
}

 

 技术图片 

 技术图片 

 技术图片 

private static Result<MqttConnectVariableHeader> decodeConnectionVariableHeader(ByteBuf buffer) {
    final Result<String> protoString = decodeString(buffer);
    int numberOfBytesConsumed = protoString.numberOfBytesConsumed;

    //read level
    final byte protocolLevel = buffer.readByte();
    numberOfBytesConsumed += 1;
    
    //version
    final MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(protoString.value, protocolLevel);
    
    //read Connect Flags
    final int b1 = buffer.readUnsignedByte();
    numberOfBytesConsumed += 1;

    //read keepAlive
    final Result<Integer> keepAlive = decodeMsbLsb(buffer);
    numberOfBytesConsumed += keepAlive.numberOfBytesConsumed;
    
    final boolean hasUserName = (b1 & 0x80) == 0x80;
    final boolean hasPassword = (b1 & 0x40) == 0x40;
    final boolean willRetain = (b1 & 0x20) == 0x20;
    final int willQos = (b1 & 0x18) >> 3;
    final boolean willFlag = (b1 & 0x04) == 0x04;
    final boolean cleanSession = (b1 & 0x02) == 0x02;
    
    if (mqttVersion == MqttVersion.MQTT_3_1_1) {
        final boolean zeroReservedFlag = (b1 & 0x01) == 0x0;
        if (!zeroReservedFlag) {
            // MQTT v3.1.1: The Server MUST validate that the reserved flag in the CONNECT Control Packet is
            // set to zero and disconnect the Client if it is not zero.
            // See http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc385349230
            throw new DecoderException("non-zero reserved flag");
        }
    }

    final MqttConnectVariableHeader mqttConnectVariableHeader = new MqttConnectVariableHeader(
            mqttVersion.protocolName(),
            mqttVersion.protocolLevel(),
            hasUserName,
            hasPassword,
            willRetain,
            willQos,
            willFlag,
            cleanSession,
            keepAlive.value);
    return new Result<MqttConnectVariableHeader>(mqttConnectVariableHeader, numberOfBytesConsumed);
}

3.2 If Publish Type

技术图片 

private static Result<MqttPublishVariableHeader> decodePublishVariableHeader(ByteBuf buffer,MqttFixedHeader mqttFixedHeader) {
    final Result<String> decodedTopic = decodeString(buffer);
    if (!isValidPublishTopicName(decodedTopic.value)) {
        throw new DecoderException("invalid publish topic name: " + decodedTopic.value + " (contains wildcards)");
    }
    int numberOfBytesConsumed = decodedTopic.numberOfBytesConsumed;

    int messageId = -1;
    if (mqttFixedHeader.qosLevel().value() > 0) {
        final Result<Integer> decodedMessageId = decodeMessageId(buffer);
        messageId = decodedMessageId.value;
        numberOfBytesConsumed += decodedMessageId.numberOfBytesConsumed;
    }
    final MqttPublishVariableHeader mqttPublishVariableHeader =    new MqttPublishVariableHeader(decodedTopic.value, messageId);
    return new Result<MqttPublishVariableHeader>(mqttPublishVariableHeader, numberOfBytesConsumed);
}

3.3 If SUBSCRIBE:UNSUBSCRIBE:SUBACK:UNSUBACK:PUBACK:PUBREC:PUBCOMP:PUBREL Type

技术图片 

private static Result<MqttMessageIdVariableHeader> decodeMessageIdVariableHeader(ByteBuf buffer) {
    final Result<Integer> messageId = decodeMessageId(buffer);
    return new Result<MqttMessageIdVariableHeader>(MqttMessageIdVariableHeader.from(messageId.value),messageId.numberOfBytesConsumed);
}

3.4 If CONNACK Type

技术图片 

private static Result<MqttConnAckVariableHeader> decodeConnAckVariableHeader(ByteBuf buffer) {
    final boolean sessionPresent = (buffer.readUnsignedByte() & 0x01) == 0x01;
    //ack return code
    byte returnCode = buffer.readByte();
    final int numberOfBytesConsumed = 2;
    final MqttConnAckVariableHeader mqttConnAckVariableHeader =
            new MqttConnAckVariableHeader(MqttConnectReturnCode.valueOf(returnCode), sessionPresent);
    return new Result<MqttConnAckVariableHeader>(mqttConnAckVariableHeader, numberOfBytesConsumed);
}

 4. Payload Parser

4.1 If CONNECT Type

  Same format if string with MSB & LSB & Data

技术图片

 

 

 

private static Result<MqttConnectPayload> decodeConnectionPayload(ByteBuf buffer,MqttConnectVariableHeader mqttConnectVariableHeader) {
    final Result<String> decodedClientId = decodeString(buffer);
    final String decodedClientIdValue = decodedClientId.value;
    final MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(mqttConnectVariableHeader.name(),(byte) mqttConnectVariableHeader.version());
    
    if (!isValidClientId(mqttVersion, decodedClientIdValue)) {
        throw new MqttIdentifierRejectedException("invalid clientIdentifier: " + decodedClientIdValue);
    }
    int numberOfBytesConsumed = decodedClientId.numberOfBytesConsumed;

    Result<String> decodedWillTopic = null;
    Result<byte[]> decodedWillMessage = null;
    if (mqttConnectVariableHeader.isWillFlag()) {
        decodedWillTopic = decodeString(buffer, 0, 32767);
        numberOfBytesConsumed += decodedWillTopic.numberOfBytesConsumed;
        decodedWillMessage = decodeByteArray(buffer);
        numberOfBytesConsumed += decodedWillMessage.numberOfBytesConsumed;
    }
    Result<String> decodedUserName = null;
    Result<byte[]> decodedPassword = null;
    if (mqttConnectVariableHeader.hasUserName()) {
        decodedUserName = decodeString(buffer);
        numberOfBytesConsumed += decodedUserName.numberOfBytesConsumed;
    }
    if (mqttConnectVariableHeader.hasPassword()) {
        decodedPassword = decodeByteArray(buffer);
        numberOfBytesConsumed += decodedPassword.numberOfBytesConsumed;
    }

    final MqttConnectPayload mqttConnectPayload =
            new MqttConnectPayload(
                    decodedClientId.value,
                    decodedWillTopic != null ? decodedWillTopic.value : null,
                    decodedWillMessage != null ? decodedWillMessage.value : null,
                    decodedUserName != null ? decodedUserName.value : null,
                    decodedPassword != null ? decodedPassword.value : null);
    return new Result<MqttConnectPayload>(mqttConnectPayload, numberOfBytesConsumed);
}

4.2 If SUBSCRIBE Type

技术图片

private static Result<MqttSubscribePayload> decodeSubscribePayload(ByteBuf buffer,int bytesRemainingInVariablePart) {
    final List<MqttTopicSubscription> subscribeTopics = new ArrayList<MqttTopicSubscription>();
    int numberOfBytesConsumed = 0;
    while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
        final Result<String> decodedTopicName = decodeString(buffer);
        numberOfBytesConsumed += decodedTopicName.numberOfBytesConsumed;
        int qos = buffer.readUnsignedByte() & 0x03;
        numberOfBytesConsumed++;
        subscribeTopics.add(new MqttTopicSubscription(decodedTopicName.value, MqttQoS.valueOf(qos)));
    }
    return new Result<MqttSubscribePayload>(new MqttSubscribePayload(subscribeTopics), numberOfBytesConsumed);
}

4.3 If SUBSCRIBE-ACK Type

技术图片

private static Result<MqttSubAckPayload> decodeSubackPayload(ByteBuf buffer,int bytesRemainingInVariablePart) {
    final List<Integer> grantedQos = new ArrayList<Integer>();
    int numberOfBytesConsumed = 0;
    while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
        int qos = buffer.readUnsignedByte();
        if (qos != MqttQoS.FAILURE.value()) {
            qos &= 0x03;
        }
        numberOfBytesConsumed++;
        grantedQos.add(qos);
    }
    return new Result<MqttSubAckPayload>(new MqttSubAckPayload(grantedQos), numberOfBytesConsumed);
}

4.4 If PUBLISH Type

The Payload contains the Application Message that is being published. The content and format of the data is application specific.
The length of the payload can be calculated by subtracting the length of the variable header from the Remaining Length field
that is in the Fixed Header. It is valid for a PUBLISH Packet to contain a zero length payload.

bytesRemainingInVariablePart = mqttFixedHeader.remainingLength();
bytesRemainingInVariablePart -= mqttVariableHeader.numberOfBytesConsumed;
private static Result<ByteBuf> decodePublishPayload(ByteBuf buffer, int bytesRemainingInVariablePart) {
    ByteBuf b = buffer.readRetainedSlice(bytesRemainingInVariablePart);
    return new Result<ByteBuf>(b, bytesRemainingInVariablePart);
}

 

以上是关于MQTT Implementation的主要内容,如果未能解决你的问题,请参考以下文章

接口自动化支持插件扩展,提供MQTT插件,MeterSphere开源持续测试平台v1.13.0发布

滚动包含 x 个 GridView 的整个片段

Linux C的MQTT测试代码编写 - 跨主机的MQTT客户端通信

MT7621加 OPENWRT 移植MQTT(paho.mqtt.c) 进行数据的收发

paho.mqtt.python模块怎么安装

在@implementation 中声明变量