Thingsboard:MQTT-订阅内部代理失败(Java/Paho)

Posted

技术标签:

【中文标题】Thingsboard:MQTT-订阅内部代理失败(Java/Paho)【英文标题】:Thingsboard: MQTT-Subscription to internal broker failed (Java/Paho) 【发布时间】:2022-01-20 21:07:49 【问题描述】:

我在订阅主题 v1/devices/me/telemetry 时遇到了一些问题。我在使用 paho Java-MQTT-Client 订阅 v1/devices/me/attributes没有问题。在属性主题中,当我在 UI 中发布新属性时,我可以获得它们。所以我的 Java 程序似乎运行良好(见底部)。

我在控制台得到以下信息:

Subscriber running  
checking  
Mqtt Connecting to broker: tcp://192.168.1.25:1883  
Mqtt Connected  
MqttException (128)  
MqttException (128)  
at org.eclipse.paho.client.mqttv3.MqttClient.subscribe(MqttClient.java:438)  
at org.eclipse.paho.client.mqttv3.MqttClient.subscribe(MqttClient.java:406)  
at Test.MqttSubscriber.subscribe(MqttSubscriber.java:57)  
at Test.MqttSubscriber.main(MqttSubscriber.java:30)

我猜错误代码 128 表示订阅被撤回。

我做错了什么?将内容发布到该主题的 thingsboard 是没有问题的。我是否必须以某种方式激活代理才能发布/订阅? TB 的内部代理是否需要特殊命令(可能是 JSON)来授予订阅?还是我必须通过物联网网关来实现它(我理解它就像 TB 可以将数据推送到外部代理的方式 - 但这里需要一个简单的订阅)?我必须使用 MQTT 从 Thingsboard 获取设备遥测的哪种替代方案?

希望有人能帮忙:)谢谢!

代码是(MqttSubscriber.java):

 package Test;

    import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
    import org.eclipse.paho.client.mqttv3.MqttCallback;
    import org.eclipse.paho.client.mqttv3.MqttClient;
    import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    import org.eclipse.paho.client.mqttv3.MqttException;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    
    public class MqttSubscriber implements MqttCallback  


    private static final String brokerUrl ="tcp://192.168.1.25:1883"; //Broker


    private static final String clientId = "test"; //Client-ID


    private static final String topic = "v1/devices/me/telemetry"; //Topic

    private static final String user = "AT2"; // Accesstoken/User from Device in TB!

    private static final String pw = "test";

    private static final char[] password = pw.toCharArray();

    public static void main(String[] args) 

    System.out.println("Subscriber running");

    new MqttSubscriber().subscribe(topic);
    

    public void subscribe(String topic)  

    MemoryPersistence persistence = new MemoryPersistence();

    try
    

        MqttClient sampleClient = new MqttClient(brokerUrl, clientId, persistence);
        MqttConnectOptions connOpts = new MqttConnectOptions();
        
        
        
        connOpts.setCleanSession(true);
        connOpts.setUserName(user);
        connOpts.setPassword(password);

        System.out.println("checking");
        System.out.println("Mqtt Connecting to broker: " + brokerUrl);

        sampleClient.connect(connOpts);
        if (sampleClient.isConnected()==true) System.out.println("Mqtt Connected");
        else System.out.println("could not connect");

        sampleClient.setCallback(this);
        sampleClient.subscribe(topic);
        
        
        

        System.out.println("Subscribed");
        System.out.println("Listening");

         catch (MqttException me) 
        System.out.println(me);
        me.printStackTrace();
        
        

        //Called when the client lost the connection to the broker
        public void connectionLost(Throwable arg0) 
    
        

        //Called when a outgoing publish is complete
        public void deliveryComplete(IMqttDeliveryToken arg0) 

        

        public void messageArrived(String topic, MqttMessage message) throws Exception 

    
        System.out.println("| Topic:" + topic);
        System.out.println("| Message: " +message.toString());
        System.out.println("-------------------------------------------------");

        

        

【问题讨论】:

【参考方案1】:

据我所知,问题在于不满意的 QoS 级别。 没有 QoS 参数的订阅默认为 QoS == 1。如果请求的主题不支持此 QoS,则客户端将引发此异常。 Paho 客户端的摘录,您对 subscribe(topic) 的调用将级联到此 subscribe 方法:

    public void subscribe(String[] topicFilters, int[] qos, IMqttMessageListener[] messageListeners) throws MqttException  
        IMqttToken tok = aClient.subscribe(topicFilters, qos, null, null, messageListeners);
        tok.waitForCompletion(getTimeToWait());
        int[] grantedQos = tok.getGrantedQos();
        for (int i = 0; i < grantedQos.length; ++i) 
            qos[i] = grantedQos[i];
        
        if (grantedQos.length == 1 && qos[0] == 0x80) 
            throw new MqttException(MqttException.REASON_CODE_SUBSCRIBE_FAILED);
        
    

因此,您必须检查所请求主题的 QoS 级别并使用该 QoS 级别进行订阅。因为 QoS 1 被拒绝,我假设该主题是使用 QoS 0 发布的。

【讨论】:

以上是关于Thingsboard:MQTT-订阅内部代理失败(Java/Paho)的主要内容,如果未能解决你的问题,请参考以下文章

Thingsboard连接MQTT设备

ThingsBoard IOT Gateway

Thingsboard源码分析-MQTT设备连接协议(下)

Python通过MQTT协议上传物联网数据给ThingsBoard

Python通过MQTT协议上传物联网数据给ThingsBoard

Thingsboard 3.0 通过 tb-gateway 网关接入 MQTT 设备教程