使用sprintboot 优雅的整合 mqtt
Posted 逆流者blog
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用sprintboot 优雅的整合 mqtt相关的知识,希望对你有一定的参考价值。
mqtt 有许多实现的产品,这里使用国内使用比较多的emqx,可以参考 emqx安装 来安装环境
emqx 官方有提供java示例来连接mqtt:传送门
下面我使用springboot 来优雅的整合下
创建springboot 工程
pom
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.0</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>top.wushanghui</groupId>
<artifactId>mqtt-client</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>mqtt-client</name>
<description>mqtt-client</description>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
主要是引入mqtt依赖:
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
配置文件(yml)
application.yml
spring:
profiles:
active: dev
application-dev.yml
server:
port: 8080
emqx:
host: tcp://192.168.33.10:1883
clientId: MQTT_IOT_DEFAULT_CLIENT
username:
password:
timeout: 1000
keepAlive: 60
配置类
EmqxProperties
package top.wushanghui.mqttclient.config;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* @author 吴尚慧
* @since 2021/11/11 19:26
*/
@Getter
@Setter
@ToString
@ConfigurationProperties(prefix = "emqx")
public class EmqxProperties
private String host;
private String clientId;
private String username;
private String password;
private Integer timeout;
private Integer keepAlive;
EmqxConfiguration
package top.wushanghui.mqttclient.config;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import top.wushanghui.mqttclient.callback.MyPushCallback;
import top.wushanghui.mqttclient.enums.MqttTopicEnum;
@Slf4j
@Configuration
@EnableConfigurationProperties(EmqxProperties.class)
public class EmqxConfiguration
@Bean
public MqttClient mqttClient(EmqxProperties properties, MqttConnectOptions mqttConnectOptions, MyPushCallback myPushCallback) throws MqttException
log.info("", properties);
String cid = properties.getClientId() + System.currentTimeMillis();
MqttClient mqttClient = new MqttClient(properties.getHost(), cid, new MemoryPersistence());
mqttClient.setCallback(myPushCallback);
myPushCallback.setMqttClient(mqttClient);
mqttClient.connect(mqttConnectOptions);
log.info("连接 emqx 成功 当前客户端id=", mqttClient.getClientId());
mqttClient.subscribe(MqttTopicEnum.DEFAULT_TOPIC.getTopic(), MqttTopicEnum.DEFAULT_TOPIC.getQos());
log.info("mqttClient=", mqttClient);
log.info("myPushCallback=", myPushCallback);
return mqttClient;
@Bean
public MqttConnectOptions mqttConnectOptions(EmqxProperties properties)
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
options.setUserName(properties.getUsername());
options.setPassword(properties.getPassword().toCharArray());
options.setConnectionTimeout(properties.getTimeout());
options.setKeepAliveInterval(properties.getKeepAlive());
options.setAutomaticReconnect(true);
return options;
上述代码中,往spring容器中注入了MqttClient 对象,如果需要发布消息的话,可以直接在类中注入 MqttClient 直接使用:
mqttClient.publish(String topic, MqttMessage message)
枚举
MqttTopicEnum
package top.wushanghui.mqttclient.enums;
public enum MqttTopicEnum
/**
* 默认topic
* 使用了通配符
*/
DEFAULT_TOPIC("sensor/+/#", 0);
/**
* 主题
*/
private String topic;
/**
* 服务质量
*/
private Integer qos;
MqttTopicEnum(String topic, Integer qos)
this.topic = topic;
this.qos = qos;
public String getTopic()
return topic;
public void setTopic(String topic)
this.topic = topic;
public Integer getQos()
return qos;
public void setQos(Integer qos)
this.qos = qos;
回调(消费消息)
package top.wushanghui.mqttclient.callback;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.stereotype.Component;
import top.wushanghui.mqttclient.enums.MqttTopicEnum;
/**
* @author 吴尚慧
* @since 2021/11/11 20:36
*/
@Slf4j
@Component
public class MyPushCallback implements MqttCallbackExtended
private MqttClient mqttClient;
@Override
public void connectComplete(boolean reconnect, String serverURI)
log.info("MyPushCallback connectComplete reconnect=, serverURI=", reconnect, serverURI);
if (reconnect)
// 重新连接后,需重新订阅主题
try
log.info("mqttClient=", mqttClient);
mqttClient.subscribe(MqttTopicEnum.DEFAULT_TOPIC.getTopic(), MqttTopicEnum.DEFAULT_TOPIC.getQos());
catch (MqttException e)
log.error("订阅主题失败", e);
@Override
public void connectionLost(Throwable cause)
log.error("MyPushCallback connectionLost cause=", cause.toString());
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception
log.info("MyPushCallback messageArrived topic=, message=", topic, message);
@Override
public void deliveryComplete(IMqttDeliveryToken token)
log.info("MyPushCallback deliveryComplete token=", token);
public void setMqttClient(MqttClient mqttClient)
this.mqttClient = mqttClient;
在messageArrived方法中可以消费消息。
启动服务,用工具发个消息试一下:
在控制台就可以收到消息:
需要源码的小伙伴点击:传送门
以上是关于使用sprintboot 优雅的整合 mqtt的主要内容,如果未能解决你的问题,请参考以下文章