Springboot整合mqtt客户端实现发送与接收消息
Posted 健康平安的活着
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Springboot整合mqtt客户端实现发送与接收消息相关的知识,希望对你有一定的参考价值。
一 springboot整合mqtt
1.1 说明
springboot+mqtt 这个工程,只实现发布代码信息逻辑,则在服务端只能创建客户端连接,没有创建主题。这种情况就需要先在服务端创建好topic主题,然后往主题上发布信息。
如果再实现了订阅代码信息逻辑,则在程序启动时,则会自动创建连接和topic主题。
1.2 工程结构
1.3 pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<!-- 引入springboot 的父类-->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.6.RELEASE</version>
<relativePath/>
</parent>
<groupId>com.dky.guolu.mqtt</groupId>
<artifactId>dkygl-mqtt-client</artifactId>
<version>1.0-SNAPSHOT</version>
<name>dkygl-mqtt-client</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13</version>
<scope>test</scope>
</dependency>
<!-- springBoot的启动器 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.0.1.RELEASE</version>
</dependency>
<!-- lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.16</version>
</dependency>
<!--mqtt 相关依赖 start -->
<!--下面几个都必须存在 -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<!--mqtt 相关依赖 end -->
</dependencies>
<build>
</build>
</project>
1.4 application.yml配置文件
server:
port: 8081
spring:
#mqtt配置
mqtt:
send:
#完成超时时间
completionTimeout: 3000
#通过mqtt发送消息验证所需用户名
username: admin
#通过mqtt发送消息验证所需密码
password: publish
#连接的mqtt地址
url: tcp://172.16.71.150:1883
#客户端id
clientId: tmq-dky-0817
#推送主题
topic: /iot/0817/mq-dky-guolu
#topic: my-test
keepAliveInterval: 20
connectionTimeout: 3000
1.配置说明:
host:你的mqtt服务地址 clientId:你的客户端ID(随意填写,不能重复)(对mq服务来说,订阅者和发布者都是客户端)topic:订阅的主题
qos:QoS username:用户名,可为空
password:密码,可为空
timeout:超时时长
keepalive: https://blog.csdn.net/solo_jm/article/details/103403534
2.qos:
0只会发送一次,不管成不成功1未成功会继续发送,直到成功,可能会收到多次
2未成功会继续发送,但会保证只收到一次
1.5 mqtt的配置文件
功能:初始化客户端,主题,发布,订阅等配置信息
package com.dky.guolu.mqtt.config;
import com.dky.guolu.mqtt.handler.MqttCallbackHandler;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.util.StringUtils;
/**
* @ClassName: MqttConfig
* @Description: TODO MQTT配置,生产者
* @Author: liujianfu
* @Date: 2021/08/17 10:22:23
* @Version: V1.0
**/
@Configuration
public class MqttConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(MqttConfig.class);
private static final byte[] WILL_DATA;
static {
WILL_DATA = "offline".getBytes();
}
/**
* mqtt订阅者使用信道名称
*/
public static final String CHANNEL_NAME_IN = "mqttInboundChannel";
/**
* mqtt发布者信道名称
*/
public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel";
/**
* mqtt发送者用户名
*/
@Value("${spring.mqtt.send.username}")
private String username;
/**
* mqtt发送者密码
*/
@Value("${spring.mqtt.send.password}")
private String password;
/**
* mqtt发送者url
*/
@Value("${spring.mqtt.send.url}")
private String hostUrl;
/**
* mqtt发送者客户端id
*/
@Value("${spring.mqtt.send.clientId}")
private String clientId;
/**
* mqtt发送者主题
*/
@Value("${spring.mqtt.send.topic}")
private String msgTopic;
/**
* mqtt发送者超时时间
*/
@Value("${spring.mqtt.send.completionTimeout}")
private int completionTimeout ;
/**
* @author liujianfu
* @description
*/
@Value("${spring.mqtt.send.keepAliveInterval}")
private int keepAliveInterval;
/**
* @author liujianfu
* @description
*/
@Value("${spring.mqtt.send.connectionTimeout}")
private int connectionTimeout;
@Autowired
private MqttCallbackHandler mqttCallbackHandler;
/**
* @author liujianfu
* @description 新建MqttConnectionOptionsBean MQTT连接器选项
* @date 2021/8/17 10:34
* @param []
* @return org.eclipse.paho.client.mqttv3.MqttConnectOptions
*/
@Bean
public MqttConnectOptions getSenderMqttConnectOptions(){
MqttConnectOptions options=new MqttConnectOptions();
// 设置连接的用户名
if(!username.trim().equals("")){
//将用户名去掉前后空格
options.setUserName(username);
}
// 设置连接的密码
options.setPassword(password.toCharArray());
// 转化连接的url地址
String[] uris={hostUrl};
// 设置连接的地址
options.setServerURIs(uris);
// 设置超时时间 单位为秒
options.setConnectionTimeout(completionTimeout);
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线
// 但这个方法并没有重连的机制
options.setKeepAliveInterval(keepAliveInterval);
// 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
//设置超时时间
options.setConnectionTimeout(connectionTimeout);
options.setCleanSession(true);
options.setAutomaticReconnect(true);
return options;
}
/**
*创建MqttPathClientFactoryBean
*/
@Bean
public MqttPahoClientFactory senderMqttClientFactory() {
//创建mqtt客户端工厂
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
//设置mqtt的连接设置
factory.setConnectionOptions(getSenderMqttConnectOptions());
return factory;
}
/**
* 发布者-MQTT信息通道(生产者)
*/
@Bean(name = CHANNEL_NAME_OUT)
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
/**
* 发布者-MQTT消息处理器(生产者) 将channel绑定到MqttClientFactory上
*
* @return {@link org.springframework.messaging.MessageHandler}
*/
@Bean
@ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
public MessageHandler mqttOutbound() {
//创建消息处理器
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
clientId+"_pub",
senderMqttClientFactory());
//设置消息处理类型为异步
messageHandler.setAsync(true);
//设置消息的默认主题
messageHandler.setDefaultTopic(msgTopic);
messageHandler.setDefaultRetained(false);
//1.重新连接MQTT服务时,不需要接收该主题最新消息,设置retained为false;
//2.重新连接MQTT服务时,需要接收该主题最新消息,设置retained为true;
return messageHandler;
}
/************ 消费者,订阅者的消费信息 *****/
/**
* MQTT信息通道(消费者)
*
*/
@Bean(name = CHANNEL_NAME_IN)
public MessageChannel mqttInboundChannel() {
return new DirectChannel();
}
/**
* MQTT消息订阅绑定(消费者)
*
*/
@Bean
public MessageProducer inbound() {
System.out.println("topics:"+msgTopic);
// 可以同时消费(订阅)多个Topic
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(
clientId+"_sub", senderMqttClientFactory(), msgTopic);
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(0);
// 设置订阅通道
adapter.setOutputChannel(mqttInboundChannel());
return adapter;
}
/**
* MQTT消息处理器(消费者)
*
*/
@Bean
@ServiceActivator(inputChannel = CHANNEL_NAME_IN)
public MessageHandler handler() {
return message -> {
String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
String payload = message.getPayload().toString();
mqttCallbackHandler.handle(topic,payload);
};
}
}
1.6 发送网关接口
package com.dky.guolu.mqtt.gateway;
import com.dky.guolu.mqtt.config.MqttConfig;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
@Component
@MessagingGateway(defaultRequestChannel = MqttConfig.CHANNEL_NAME_OUT)
public interface MqSendMessageGateWay {
/**
* 默认的消息机制
* @param data
*/
void sendToMqtt(String data);
/**
* 发送消息 向mqtt指定topic发送消息
* @param topic
* @param payload
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
/**
* 发送消息 向mqtt指定topic发送消息
* @param topic
* @param qos
* @param payload
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}
1.7 消息订阅消费类
package com.dky.guolu.mqtt.handler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
/**
* @ClassName: MqttCallbackHandle
* @Description: TODO
* @Author: liujianfu
* @Date: 2021/08/17 15:44:04
* @Version: V1.0
**/
@Service
public class MqttCallbackHandler {
private static final Logger logger = LoggerFactory.getLogger(MqttCallbackHandler.class);
public void handle(String topic, String payload){
logger.info("MqttCallbackHandle:" + topic + "---"+ payload);
// 根据topic分别进行消息处理。
}
}
1.8 发布生产
package com.dky.guolu.mqtt.controller;
import com.dky.guolu.mqtt.gateway.MqSendMessageGateWay;
import com.dky.guolu.mqtt.util.R;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
/**
* @ClassName: SendMessageController
* @Description: TODO
* @Author: liujianfu
* @Date: 2021/08/17 11:14:21
* @Version: V1.0
**/
@RestController
@RequestMapping(value = "/")
public class SendMessageController {
@Autowired
private MqSendMessageGateWay mqSendMessageGateWay;
@RequestMapping("/send")
@ResponseBody
private ResponseEntity<String> send(String data){
System.out.println("data:"+data);
mqSendMessageGateWay.sendToMqtt(data);
// return R.ok("OK");
return new ResponseEntity<>("OK", HttpStatus.OK);
}
/**
* 动态增加主题
* @param topic
* @param data
*/
@ResponseBody
@RequestMapping("/sendToTopic")
private ResponseEntity<String> send(String topic ,String data){
mqSendMessageGateWay.sendToMqtt(topic,data);
return new ResponseEntity<>("OK", HttpStatus.OK);
}
}
1.9 工具类
package com.dky.guolu.mqtt.util;
import org.springframework.http.HttpStatus;
import java.util.HashMap;
import java.util.Map;
/**
* @ClassName: R
* @Description: TODO
* @Author: liujianfu
* @Date: 2021/08/16 14:58:55
* @Version: V1.0
**/
public class R extends HashMap<String, Object> {
private static final long serialVersionUID = 1L;
public R() {
put("code", 200);
put("msg", "success");
}
public static R error() {
return error(HttpStatus.INTERNAL_SERVER_ERROR.value(), "未知异常,请联系管理员");
}
public static R error(String msg) {
return error(502, msg);
}
public static R error(int code, String msg) {
R r = new R();
r.put("code", code);
r.put("msg", msg);
return r;
}
public static R error(int code, String msg, String data) {
R r = new R();
r.put("code", code);
r.put("msg", msg);
r.put("data", data);
return r;
}
public static R ok(String msg) {
R r = new R();
r.put("msg", msg);
return r;
}
public static R ok(Map<String, Object> map) {
R r = new R();
r.putAll(map);
return r;
}
}
1.10 测试
启动类:
@SpringBootApplication
public class App
{
public static void main( String[] args )
{
SpringApplication.run(App.class, args);
System.out.println("启动完成!!!");
}
}
1.10.1 发布者发布信息
1.10.2 mqtt服务端
页面自定义订阅端:
1.10.3 自定义订阅的客户端
以上是关于Springboot整合mqtt客户端实现发送与接收消息的主要内容,如果未能解决你的问题,请参考以下文章