springboot下mqtt简单使用

Posted 好大的月亮

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了springboot下mqtt简单使用相关的知识,希望对你有一定的参考价值。

概述

mqtt是一个物联网协议。
简单来说它由三大块组成:

  1. 服务端(转发消息的server)
  2. 生产者(发送消息)
  3. 消费者(接收消息)

简单来说就是一个轻量级的收发消息的东西。发消息的人也可以消费消息。
消息由中间的消息服务器转发给对应的订阅者。

代码demo

下面是自己运行的demo,copy的官网
https://www.emqx.io/docs/zh/v5.0/development/java.html#paho-java-%E4%BD%BF%E7%94%A8%E7%A4%BA%E4%BE%8B

package com.fchan;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

/**
 * ClassName: MqttClient
 * Description:
 * date: 2022/11/1 18:01
 *
 * @author fchen
 */
public class MqttSendClient 



    public static void main(String[] args)
        String subTopic = "nb_fchan/#";
        String pubTopic = "nb_fchan/1";
        String content = "Hello World";
        int qos = 2;
        String broker = "tcp://broker.emqx.io:1883";
        String clientId = "emqx_test";
        MemoryPersistence persistence = new MemoryPersistence();

        try 
            MqttClient client = new MqttClient(broker, clientId, persistence);

            // MQTT 连接选项
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setUserName("emqx_test");
            connOpts.setPassword("emqx_test_password".toCharArray());
            // 保留会话
            connOpts.setCleanSession(true);

            // 设置回调
            client.setCallback(new ClientOnMessageCallback());

            // 建立连接
            System.out.println("Connecting to broker: " + broker);
            client.connect(connOpts);

            System.out.println("Connected");
            System.out.println("Publishing message: " + content);

            // 订阅
            client.subscribe(subTopic);

            // 消息发布所需参数
            MqttMessage message = new MqttMessage(content.getBytes());
            message.setQos(qos);
            client.publish(pubTopic, message);
            System.out.println("Message published");

            client.disconnect();
            System.out.println("Disconnected");
            client.close();
            System.exit(0);
         catch (MqttException me) 
            System.out.println("reason " + me.getReasonCode());
            System.out.println("msg " + me.getMessage());
            System.out.println("loc " + me.getLocalizedMessage());
            System.out.println("cause " + me.getCause());
            System.out.println("excep " + me);
            me.printStackTrace();
        
    










package com.fchan;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;

/**
 * ClassName: OnMessageCallback
 * Description:
 * date: 2022/11/1 18:06
 *
 * @author fchen
 */
public class ClientOnMessageCallback implements MqttCallback 
    @Override
    public void connectionLost(Throwable cause) 
        // 连接丢失后,一般在这里面进行重连
        System.out.println("连接断开,可以做重连");
    

    /**
     * QoS0,At most once,至多一次;
     * QoS1,At least once,至少一次;
     * QoS2,Exactly once,确保只有一次。
     * @param topic
     * @param message
     * @throws Exception
     */
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception 
        // subscribe后得到的消息会执行到这里面
        System.out.println("接收消息主题:" + topic);
        System.out.println("接收消息Qos:" + message.getQos());
        System.out.println("接收消息内容:" + new String(message.getPayload()));
    

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) 
        System.out.println("deliveryComplete---------" + token.isComplete());
    

以上是关于springboot下mqtt简单使用的主要内容,如果未能解决你的问题,请参考以下文章

springboot当中使用EMQX(MQTT协议)

基于(springboot+vue+mqtt协议)的智能家居系统

使用JMeter测试MQTT协议

SpringBoot2.x集成MQTT实现消息订阅(附源码)

springboot结合mqtt服务器构建消息生产与消费示例

SpringBoot2集成MQTT 实现消息的发布订阅