MQTT Implementation
Posted iiidragon
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MQTT Implementation相关的知识,希望对你有一定的参考价值。
1. MQTT Message
public class MqttMessage{ private final MqttFixedHeader mqttFixedHeader; private final Object variableHeader; private final Object payload; } public final class MqttFixedHeader { private final MqttMessageType messageType; private final boolean isDup; private final MqttQoS qosLevel; private final boolean isRetain; private final int remainingLength; } public enum MqttMessageType { CONNECT(1), CONNACK(2), PUBLISH(3), PUBACK(4), PUBREC(5), PUBREL(6), PUBCOMP(7), SUBSCRIBE(8), SUBACK(9), UNSUBSCRIBE(10), UNSUBACK(11), PINGREQ(12), PINGRESP(13), DISCONNECT(14); }
2. Decode MQTT FixedHeader
Max of remaining length is 268435455 = 255 M
private static MqttFixedHeader decodeFixedHeader(ByteBuf buffer) { //read 1 byte short b1 = buffer.readUnsignedByte(); //mqtt type MqttMessageType messageType = MqttMessageType.valueOf(b1 >> 4); //dup flag if PUBLISH TYPE boolean dupFlag = (b1 & 0x08) == 0x08; //QoS if PUBLISH TYPE int qosLevel = (b1 & 0x06) >> 1; //RETAIN if PUBLISH TYPE boolean retain = (b1 & 0x01) != 0; //read length of payload int remainingLength = 0; int multiplier = 1; short digit; int loops = 0; do { //read 1 byte digit = buffer.readUnsignedByte(); // 127 = 0x7F remainingLength += (digit & 127) * multiplier; //128 = 0x80 multiplier *= 128; //max bytes is 4 loops++; } while ((digit & 128) != 0 && loops < 4); // MQTT protocol limits Remaining Length to 4 bytes if (loops == 4 && (digit & 128) != 0) { throw new DecoderException("remaining length exceeds 4 digits (" + messageType + ‘)‘); } //construct MqttFixedHeader MqttFixedHeader decodedFixedHeader = new MqttFixedHeader(messageType, dupFlag, MqttQoS.valueOf(qosLevel), retain, remainingLength); return validateFixedHeader(resetUnusedFields(decodedFixedHeader)); }
3. Decode MQTT VariableHeader
private static Result<Integer> decodeMsbLsb(ByteBuf buffer, int min, int max) { short msbSize = buffer.readUnsignedByte(); short lsbSize = buffer.readUnsignedByte(); final int numberOfBytesConsumed = 2; int result = msbSize << 8 | lsbSize; if (result < min || result > max) { result = -1; } return new Result<Integer>(result, numberOfBytesConsumed);
private static Result<?> decodeVariableHeader(ByteBuf buffer, MqttFixedHeader mqttFixedHeader) { switch (mqttFixedHeader.messageType()) { case CONNECT: return decodeConnectionVariableHeader(buffer); case CONNACK: return decodeConnAckVariableHeader(buffer); case SUBSCRIBE: case UNSUBSCRIBE: case SUBACK: case UNSUBACK: case PUBACK: case PUBREC: case PUBCOMP: case PUBREL: return decodeMessageIdVariableHeader(buffer); case PUBLISH: return decodePublishVariableHeader(buffer, mqttFixedHeader); case PINGREQ: case PINGRESP: case DISCONNECT: // Empty variable header return new Result<Object>(null, 0); } //should never reach here return new Result<Object>(null, 0); }
3.1 If Connect Type
private static Result<Integer> decodeMsbLsb(ByteBuf buffer, int min, int max) { short msbSize = buffer.readUnsignedByte(); short lsbSize = buffer.readUnsignedByte(); final int numberOfBytesConsumed = 2; int result = msbSize << 8 | lsbSize; if (result < min || result > max) { result = -1; } return new Result<Integer>(result, numberOfBytesConsumed); }
private static Result<String> decodeString(ByteBuf buffer, int minBytes, int maxBytes) { final Result<Integer> decodedSize = decodeMsbLsb(buffer); int size = decodedSize.value; int numberOfBytesConsumed = decodedSize.numberOfBytesConsumed; if (size < minBytes || size > maxBytes) { buffer.skipBytes(size); numberOfBytesConsumed += size; return new Result<String>(null, numberOfBytesConsumed); } String s = buffer.toString(buffer.readerIndex(), size, CharsetUtil.UTF_8); buffer.skipBytes(size); numberOfBytesConsumed += size; return new Result<String>(s, numberOfBytesConsumed); }
private static Result<MqttConnectVariableHeader> decodeConnectionVariableHeader(ByteBuf buffer) { final Result<String> protoString = decodeString(buffer); int numberOfBytesConsumed = protoString.numberOfBytesConsumed; //read level final byte protocolLevel = buffer.readByte(); numberOfBytesConsumed += 1; //version final MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(protoString.value, protocolLevel); //read Connect Flags final int b1 = buffer.readUnsignedByte(); numberOfBytesConsumed += 1; //read keepAlive final Result<Integer> keepAlive = decodeMsbLsb(buffer); numberOfBytesConsumed += keepAlive.numberOfBytesConsumed; final boolean hasUserName = (b1 & 0x80) == 0x80; final boolean hasPassword = (b1 & 0x40) == 0x40; final boolean willRetain = (b1 & 0x20) == 0x20; final int willQos = (b1 & 0x18) >> 3; final boolean willFlag = (b1 & 0x04) == 0x04; final boolean cleanSession = (b1 & 0x02) == 0x02; if (mqttVersion == MqttVersion.MQTT_3_1_1) { final boolean zeroReservedFlag = (b1 & 0x01) == 0x0; if (!zeroReservedFlag) { // MQTT v3.1.1: The Server MUST validate that the reserved flag in the CONNECT Control Packet is // set to zero and disconnect the Client if it is not zero. // See http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc385349230 throw new DecoderException("non-zero reserved flag"); } } final MqttConnectVariableHeader mqttConnectVariableHeader = new MqttConnectVariableHeader( mqttVersion.protocolName(), mqttVersion.protocolLevel(), hasUserName, hasPassword, willRetain, willQos, willFlag, cleanSession, keepAlive.value); return new Result<MqttConnectVariableHeader>(mqttConnectVariableHeader, numberOfBytesConsumed); }
3.2 If Publish Type
private static Result<MqttPublishVariableHeader> decodePublishVariableHeader(ByteBuf buffer,MqttFixedHeader mqttFixedHeader) { final Result<String> decodedTopic = decodeString(buffer); if (!isValidPublishTopicName(decodedTopic.value)) { throw new DecoderException("invalid publish topic name: " + decodedTopic.value + " (contains wildcards)"); } int numberOfBytesConsumed = decodedTopic.numberOfBytesConsumed; int messageId = -1; if (mqttFixedHeader.qosLevel().value() > 0) { final Result<Integer> decodedMessageId = decodeMessageId(buffer); messageId = decodedMessageId.value; numberOfBytesConsumed += decodedMessageId.numberOfBytesConsumed; } final MqttPublishVariableHeader mqttPublishVariableHeader = new MqttPublishVariableHeader(decodedTopic.value, messageId); return new Result<MqttPublishVariableHeader>(mqttPublishVariableHeader, numberOfBytesConsumed); }
3.3 If SUBSCRIBE:UNSUBSCRIBE:SUBACK:UNSUBACK:PUBACK:PUBREC:PUBCOMP:PUBREL Type
private static Result<MqttMessageIdVariableHeader> decodeMessageIdVariableHeader(ByteBuf buffer) { final Result<Integer> messageId = decodeMessageId(buffer); return new Result<MqttMessageIdVariableHeader>(MqttMessageIdVariableHeader.from(messageId.value),messageId.numberOfBytesConsumed); }
3.4 If CONNACK Type
private static Result<MqttConnAckVariableHeader> decodeConnAckVariableHeader(ByteBuf buffer) { final boolean sessionPresent = (buffer.readUnsignedByte() & 0x01) == 0x01; //ack return code byte returnCode = buffer.readByte(); final int numberOfBytesConsumed = 2; final MqttConnAckVariableHeader mqttConnAckVariableHeader = new MqttConnAckVariableHeader(MqttConnectReturnCode.valueOf(returnCode), sessionPresent); return new Result<MqttConnAckVariableHeader>(mqttConnAckVariableHeader, numberOfBytesConsumed); }
4. Payload Parser
4.1 If CONNECT Type
Same format if string with MSB & LSB & Data
private static Result<MqttConnectPayload> decodeConnectionPayload(ByteBuf buffer,MqttConnectVariableHeader mqttConnectVariableHeader) { final Result<String> decodedClientId = decodeString(buffer); final String decodedClientIdValue = decodedClientId.value; final MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(mqttConnectVariableHeader.name(),(byte) mqttConnectVariableHeader.version()); if (!isValidClientId(mqttVersion, decodedClientIdValue)) { throw new MqttIdentifierRejectedException("invalid clientIdentifier: " + decodedClientIdValue); } int numberOfBytesConsumed = decodedClientId.numberOfBytesConsumed; Result<String> decodedWillTopic = null; Result<byte[]> decodedWillMessage = null; if (mqttConnectVariableHeader.isWillFlag()) { decodedWillTopic = decodeString(buffer, 0, 32767); numberOfBytesConsumed += decodedWillTopic.numberOfBytesConsumed; decodedWillMessage = decodeByteArray(buffer); numberOfBytesConsumed += decodedWillMessage.numberOfBytesConsumed; } Result<String> decodedUserName = null; Result<byte[]> decodedPassword = null; if (mqttConnectVariableHeader.hasUserName()) { decodedUserName = decodeString(buffer); numberOfBytesConsumed += decodedUserName.numberOfBytesConsumed; } if (mqttConnectVariableHeader.hasPassword()) { decodedPassword = decodeByteArray(buffer); numberOfBytesConsumed += decodedPassword.numberOfBytesConsumed; } final MqttConnectPayload mqttConnectPayload = new MqttConnectPayload( decodedClientId.value, decodedWillTopic != null ? decodedWillTopic.value : null, decodedWillMessage != null ? decodedWillMessage.value : null, decodedUserName != null ? decodedUserName.value : null, decodedPassword != null ? decodedPassword.value : null); return new Result<MqttConnectPayload>(mqttConnectPayload, numberOfBytesConsumed); }
4.2 If SUBSCRIBE Type
private static Result<MqttSubscribePayload> decodeSubscribePayload(ByteBuf buffer,int bytesRemainingInVariablePart) { final List<MqttTopicSubscription> subscribeTopics = new ArrayList<MqttTopicSubscription>(); int numberOfBytesConsumed = 0; while (numberOfBytesConsumed < bytesRemainingInVariablePart) { final Result<String> decodedTopicName = decodeString(buffer); numberOfBytesConsumed += decodedTopicName.numberOfBytesConsumed; int qos = buffer.readUnsignedByte() & 0x03; numberOfBytesConsumed++; subscribeTopics.add(new MqttTopicSubscription(decodedTopicName.value, MqttQoS.valueOf(qos))); } return new Result<MqttSubscribePayload>(new MqttSubscribePayload(subscribeTopics), numberOfBytesConsumed); }
4.3 If SUBSCRIBE-ACK Type
private static Result<MqttSubAckPayload> decodeSubackPayload(ByteBuf buffer,int bytesRemainingInVariablePart) { final List<Integer> grantedQos = new ArrayList<Integer>(); int numberOfBytesConsumed = 0; while (numberOfBytesConsumed < bytesRemainingInVariablePart) { int qos = buffer.readUnsignedByte(); if (qos != MqttQoS.FAILURE.value()) { qos &= 0x03; } numberOfBytesConsumed++; grantedQos.add(qos); } return new Result<MqttSubAckPayload>(new MqttSubAckPayload(grantedQos), numberOfBytesConsumed); }
4.4 If PUBLISH Type
The Payload contains the Application Message that is being published. The content and format of the data is application specific.
The length of the payload can be calculated by subtracting the length of the variable header from the Remaining Length field
that is in the Fixed Header. It is valid for a PUBLISH Packet to contain a zero length payload.
bytesRemainingInVariablePart = mqttFixedHeader.remainingLength(); bytesRemainingInVariablePart -= mqttVariableHeader.numberOfBytesConsumed; private static Result<ByteBuf> decodePublishPayload(ByteBuf buffer, int bytesRemainingInVariablePart) { ByteBuf b = buffer.readRetainedSlice(bytesRemainingInVariablePart); return new Result<ByteBuf>(b, bytesRemainingInVariablePart); }
以上是关于MQTT Implementation的主要内容,如果未能解决你的问题,请参考以下文章
接口自动化支持插件扩展,提供MQTT插件,MeterSphere开源持续测试平台v1.13.0发布
Linux C的MQTT测试代码编写 - 跨主机的MQTT客户端通信