Springboot整合mqtt客户端实现发送与接收消息

Posted 健康平安的活着

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Springboot整合mqtt客户端实现发送与接收消息相关的知识,希望对你有一定的参考价值。

一 springboot整合mqtt

1.1 说明

springboot+mqtt 这个工程,只实现发布代码信息逻辑,则在服务端只能创建客户端连接,没有创建主题。这种情况就需要先在服务端创建好topic主题,然后往主题上发布信息

如果再实现了订阅代码信息逻辑,则在程序启动时,则会自动创建连接和topic主题

1.2 工程结构

1.3 pom文件

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <!-- 引入springboot 的父类-->
  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.2.6.RELEASE</version>
    <relativePath/>
  </parent>
  <groupId>com.dky.guolu.mqtt</groupId>
  <artifactId>dkygl-mqtt-client</artifactId>
  <version>1.0-SNAPSHOT</version>

  <name>dkygl-mqtt-client</name>
  <!-- FIXME change it to the project's website -->
  <url>http://www.example.com</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.13</version>
      <scope>test</scope>
    </dependency>
    <!-- springBoot的启动器 -->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
      <version>2.0.1.RELEASE</version>
    </dependency>
    <!-- lombok -->
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <version>1.16.16</version>
    </dependency>
    <!--mqtt 相关依赖 start -->
    <!--下面几个都必须存在 -->
    <dependency>
      <groupId>org.springframework.integration</groupId>
      <artifactId>spring-integration-core</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-integration</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.integration</groupId>
      <artifactId>spring-integration-stream</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.integration</groupId>
      <artifactId>spring-integration-mqtt</artifactId>
    </dependency>
    <!--mqtt 相关依赖 end -->
  </dependencies>


  <build>

  </build>
</project>

1.4 application.yml配置文件

server:
  port: 8081
spring:
  #mqtt配置
  mqtt:
    send:
      #完成超时时间
      completionTimeout: 3000
      #通过mqtt发送消息验证所需用户名
      username: admin
      #通过mqtt发送消息验证所需密码
      password: publish
      #连接的mqtt地址
      url: tcp://172.16.71.150:1883
      #客户端id
      clientId: tmq-dky-0817
      #推送主题
      topic: /iot/0817/mq-dky-guolu
      #topic: my-test
      keepAliveInterval: 20
      connectionTimeout: 3000

1.配置说明:
host:你的mqtt服务地址 clientId:你的客户端ID(随意填写,不能重复)(对mq服务来说,订阅者和发布者都是客户端)

topic:订阅的主题

qos:QoS username:用户名,可为空

password:密码,可为空

timeout:超时时长

keepalive: https://blog.csdn.net/solo_jm/article/details/103403534

2.qos:
0只会发送一次,不管成不成功

1未成功会继续发送,直到成功,可能会收到多次

2未成功会继续发送,但会保证只收到一次

1.5 mqtt的配置文件

功能:初始化客户端,主题,发布,订阅等配置信息

package com.dky.guolu.mqtt.config;


import com.dky.guolu.mqtt.handler.MqttCallbackHandler;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.util.StringUtils;

/**
 * @ClassName: MqttConfig
 * @Description: TODO  MQTT配置,生产者
 * @Author: liujianfu
 * @Date: 2021/08/17 10:22:23 
 * @Version: V1.0
 **/
@Configuration
public class MqttConfig {
    private static final Logger LOGGER = LoggerFactory.getLogger(MqttConfig.class);
    private static final byte[] WILL_DATA;
    static {
        WILL_DATA = "offline".getBytes();
    }
    /**
     * mqtt订阅者使用信道名称
     */
    public static final String CHANNEL_NAME_IN = "mqttInboundChannel";
    /**
     * mqtt发布者信道名称
     */
    public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel";
    /**
     * mqtt发送者用户名
     */
    @Value("${spring.mqtt.send.username}")
    private String username;
    /**
     * mqtt发送者密码
     */
    @Value("${spring.mqtt.send.password}")
    private String password;
    /**
     * mqtt发送者url
     */
    @Value("${spring.mqtt.send.url}")
    private String hostUrl;
    /**
     * mqtt发送者客户端id
     */
    @Value("${spring.mqtt.send.clientId}")
    private String clientId;
    /**
     * mqtt发送者主题
     */
    @Value("${spring.mqtt.send.topic}")
    private String msgTopic;
    /**
     * mqtt发送者超时时间
     */
    @Value("${spring.mqtt.send.completionTimeout}")
    private int completionTimeout ;
    /**
    * @author liujianfu
    * @description
    */
    @Value("${spring.mqtt.send.keepAliveInterval}")
    private int keepAliveInterval;
    /**
     * @author liujianfu
     * @description
     */
    @Value("${spring.mqtt.send.connectionTimeout}")
    private int connectionTimeout;
    @Autowired
    private MqttCallbackHandler mqttCallbackHandler;
    /**
    * @author liujianfu
    * @description   新建MqttConnectionOptionsBean  MQTT连接器选项
    * @date 2021/8/17 10:34
    * @param []
    * @return org.eclipse.paho.client.mqttv3.MqttConnectOptions
    */
    @Bean
    public MqttConnectOptions getSenderMqttConnectOptions(){
        MqttConnectOptions options=new MqttConnectOptions();
        // 设置连接的用户名
        if(!username.trim().equals("")){
            //将用户名去掉前后空格
            options.setUserName(username);
        }
        // 设置连接的密码
        options.setPassword(password.toCharArray());
        // 转化连接的url地址
        String[] uris={hostUrl};
        // 设置连接的地址
        options.setServerURIs(uris);
        // 设置超时时间 单位为秒
        options.setConnectionTimeout(completionTimeout);
        // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线
        // 但这个方法并没有重连的机制
        options.setKeepAliveInterval(keepAliveInterval);
        // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
        //设置超时时间
        options.setConnectionTimeout(connectionTimeout);
        options.setCleanSession(true);
        options.setAutomaticReconnect(true);
        return options;
    }
    /**
     *创建MqttPathClientFactoryBean
     */
    @Bean
    public MqttPahoClientFactory senderMqttClientFactory() {
        //创建mqtt客户端工厂
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        //设置mqtt的连接设置
        factory.setConnectionOptions(getSenderMqttConnectOptions());
        return factory;
    }
    /**
     * 发布者-MQTT信息通道(生产者)
     */
    @Bean(name = CHANNEL_NAME_OUT)
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }


    /**
     * 发布者-MQTT消息处理器(生产者)  将channel绑定到MqttClientFactory上
     *
     * @return {@link org.springframework.messaging.MessageHandler}
     */
    @Bean
    @ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
    public MessageHandler mqttOutbound() {
        //创建消息处理器
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
                clientId+"_pub",
                senderMqttClientFactory());
        //设置消息处理类型为异步
        messageHandler.setAsync(true);
        //设置消息的默认主题
        messageHandler.setDefaultTopic(msgTopic);
        messageHandler.setDefaultRetained(false);
        //1.重新连接MQTT服务时,不需要接收该主题最新消息,设置retained为false;
        //2.重新连接MQTT服务时,需要接收该主题最新消息,设置retained为true;
        return messageHandler;
    }

    /************                             消费者,订阅者的消费信息                               *****/

    /**
     * MQTT信息通道(消费者)
     *
     */
    @Bean(name = CHANNEL_NAME_IN)
    public MessageChannel mqttInboundChannel() {
        return new DirectChannel();
    }
    /**
     * MQTT消息订阅绑定(消费者)
     *
     */
    @Bean
    public MessageProducer inbound() {
        System.out.println("topics:"+msgTopic);
        // 可以同时消费(订阅)多个Topic
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(
                        clientId+"_sub", senderMqttClientFactory(), msgTopic);
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(0);
        // 设置订阅通道
        adapter.setOutputChannel(mqttInboundChannel());
        return adapter;
    }

    /**
     * MQTT消息处理器(消费者)
     *
     */
    @Bean
    @ServiceActivator(inputChannel = CHANNEL_NAME_IN)
    public MessageHandler handler() {
        return message -> {
            String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
            String payload = message.getPayload().toString();
            mqttCallbackHandler.handle(topic,payload);
        };
    }
}

 1.6 发送网关接口

package com.dky.guolu.mqtt.gateway;

import com.dky.guolu.mqtt.config.MqttConfig;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

@Component
@MessagingGateway(defaultRequestChannel = MqttConfig.CHANNEL_NAME_OUT)
public interface MqSendMessageGateWay {
    /**
     * 默认的消息机制
     * @param data
     */
    void sendToMqtt(String data);

    /**
     * 发送消息 向mqtt指定topic发送消息
     * @param topic
     * @param payload
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);

    /**
     * 发送消息 向mqtt指定topic发送消息
     * @param topic
     * @param qos
     * @param payload
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}

1.7 消息订阅消费类

package com.dky.guolu.mqtt.handler;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

/**
 * @ClassName: MqttCallbackHandle
 * @Description: TODO
 * @Author: liujianfu
 * @Date: 2021/08/17 15:44:04 
 * @Version: V1.0
 **/
@Service
public class MqttCallbackHandler {
    private static final Logger logger = LoggerFactory.getLogger(MqttCallbackHandler.class);
    public void handle(String topic, String payload){
        logger.info("MqttCallbackHandle:" + topic + "---"+ payload);
        // 根据topic分别进行消息处理。

    }
}

1.8 发布生产

package com.dky.guolu.mqtt.controller;

import com.dky.guolu.mqtt.gateway.MqSendMessageGateWay;
import com.dky.guolu.mqtt.util.R;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;

/**
 * @ClassName: SendMessageController
 * @Description: TODO
 * @Author: liujianfu
 * @Date: 2021/08/17 11:14:21 
 * @Version: V1.0
 **/
@RestController
@RequestMapping(value = "/")
public class SendMessageController {
    @Autowired
   private MqSendMessageGateWay  mqSendMessageGateWay;
    @RequestMapping("/send")
    @ResponseBody
    private ResponseEntity<String> send(String data){
        System.out.println("data:"+data);
        mqSendMessageGateWay.sendToMqtt(data);
       // return R.ok("OK");
        return new ResponseEntity<>("OK", HttpStatus.OK);
    }
    /**
     * 动态增加主题
     * @param topic
     * @param data
     */
    @ResponseBody
    @RequestMapping("/sendToTopic")
    private ResponseEntity<String> send(String topic ,String data){
        mqSendMessageGateWay.sendToMqtt(topic,data);
        return new ResponseEntity<>("OK", HttpStatus.OK);
    }
}

1.9 工具类

package com.dky.guolu.mqtt.util;


import org.springframework.http.HttpStatus;

import java.util.HashMap;
import java.util.Map;

/**
 * @ClassName: R
 * @Description: TODO
 * @Author: liujianfu
 * @Date: 2021/08/16 14:58:55 
 * @Version: V1.0
 **/
public class R extends HashMap<String, Object> {
    private static final long serialVersionUID = 1L;

    public R() {
        put("code", 200);
        put("msg", "success");
    }

    public static R error() {
        return error(HttpStatus.INTERNAL_SERVER_ERROR.value(), "未知异常,请联系管理员");
    }

    public static R error(String msg) {
        return error(502, msg);
    }

    public static R error(int code, String msg) {
        R r = new R();
        r.put("code", code);
        r.put("msg", msg);
        return r;
    }

    public static R error(int code, String msg, String data) {
        R r = new R();
        r.put("code", code);
        r.put("msg", msg);
        r.put("data", data);
        return r;
    }

    public static R ok(String msg) {
        R r = new R();
        r.put("msg", msg);
        return r;
    }

    public static R ok(Map<String, Object> map) {
        R r = new R();
        r.putAll(map);
        return r;
    }
}

1.10 测试

启动类:

@SpringBootApplication
public class App 
{
    public static void main( String[] args )
    {
        SpringApplication.run(App.class, args);
        System.out.println("启动完成!!!");
    }
}

1.10.1 发布者发布信息

 1.10.2  mqtt服务端

 页面自定义订阅端:

  1.10.3  自定义订阅的客户端

https://gitee.com/jurf-liu/dkygl-mqtt-client.git 

以上是关于Springboot整合mqtt客户端实现发送与接收消息的主要内容,如果未能解决你的问题,请参考以下文章

springboot整合RabbitMQ Mqtt

使用sprintboot 优雅的整合 mqtt

物联网架构成长之路(32)-SpringBoot集成MQTT客户端

Springboot 整合mqtt服务完整代码示例

开发经验springboot整合websocket实现群聊

springboot整合系列