Sprngboot 整合 mqtt 完整代码示例
Posted 洛阳泰山
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Sprngboot 整合 mqtt 完整代码示例相关的知识,希望对你有一定的参考价值。
pom文件引入依赖
<!--mqtt-->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.4</version>
</dependency>
application.yml 加入配置
spring:
application:
name: test-porject
mqtt:
url: tcp://$MQTT_HOST:172.16.10.201:1883
client-id: $spring.application.name
topic:
- $MQTT_TOPIC:/iot/#
配置类
import com.workface.fullymechanizemine.common.listener.MqttSubscribeListener;
import com.workface.fullymechanizemine.core.tool.utils.StringUtil;
import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableConfigurationProperties(MqttProperties.class)
public class MqttConfiguration
private static final Logger log = LoggerFactory.getLogger(MqttConfiguration.class);
@Autowired
private MqttProperties mqttProperties;
public MqttConfiguration()
@Bean
public MqttConnectOptions mqttConnectOptions()
MqttConnectOptions connectOptions = new MqttConnectOptions();
connectOptions.setServerURIs(new String[]this.mqttProperties.getUrl());
if (StringUtil.isNotBlank(this.mqttProperties.getUrl()))
connectOptions.setUserName(this.mqttProperties.getUsername());
if (StringUtil.isNotBlank(this.mqttProperties.getPassword()))
connectOptions.setPassword(this.mqttProperties.getPassword().toCharArray());
connectOptions.setKeepAliveInterval(60);
return connectOptions;
@Bean
public IMqttClient mqttClient(MqttConnectOptions options) throws MqttException
IMqttClient mqttClient = new MqttClient(this.mqttProperties.getUrl(), this.mqttProperties.getClientId());
mqttClient.connect(options);
for(int x = 0; x < this.mqttProperties.getTopic().length; ++x)
mqttClient.subscribe(this.mqttProperties.getTopic()[x], new MqttSubscribeListener());
return mqttClient;
MqttProperties属性类
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
@Data
@ConfigurationProperties("spring.mqtt")
public class MqttProperties
private String url;
private String clientId;
private String username;
private String password;
private String[] topic;
MqttSubscribeListener 订阅监听类
import com.workface.fullymechanizemine.common.event.MqttSubscribeEvent;
import com.workface.fullymechanizemine.core.tool.utils.SpringUtil;
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class MqttSubscribeListener implements IMqttMessageListener
@Override
public void messageArrived(String s, MqttMessage mqttMessage)
String content = new String(mqttMessage.getPayload());
MqttSubscribeEvent event = new MqttSubscribeEvent(s, content);
SpringUtil.publishEvent(event);
MqttEventListener 事件监听类
import com.baomidou.mybatisplus.core.toolkit.StringPool;
import com.workface.fullymechanizemine.common.config.mqtt.MqttProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
import java.util.Arrays;
import java.util.List;
@Configuration
public class MqttEventListener
private static final Logger log = LoggerFactory.getLogger(MqttEventListener.class);
@Resource
private MqttProperties mqttProperties;
private String processTopic (String topic)
List<String> topics = Arrays.asList(mqttProperties.getTopic());
for (String wild : topics)
wild = wild.replace(StringPool.HASH, StringPool.EMPTY);
if (topic.startsWith(wild))
return topic.replace(wild, StringPool.EMPTY);
return StringPool.EMPTY;
// private static List<AcceptPointDTO> toPoints (Object source)
// String str = Objects.toString(source);
// List<AcceptPointDTO> data = JSONArray.parseArray(str, AcceptPointDTO.class);
// return data;
//
// @Async
// @EventListener(MqttSubscribeEvent.class)
// public void listen (MqttSubscribeEvent event)
// String topic = processTopic(event.getTopic());
// Object source = event.getSource();
// List<AcceptPointDTO> data = toPoints(source);
// if (Func.isEmpty(data))
// return;
//
// ConcurrentHashMap<String, WebSocketService> webSocketMap = WebSocketService.getWebSocketMap();
// if (!Func.isEmpty(webSocketMap) && webSocketMap.size()>=1)
// for (Map.Entry<String, WebSocketService> entry : webSocketMap.entrySet())
// WebSocketService webSocketService = entry.getValue();
// try
// webSocketService.sendMessage(JSONObject.toJSONString(data));
// catch (IOException e)
// e.printStackTrace();
//
//
//
// I_POINT_VALUE_SERVICE.savePointValue(data);
//
MqttUtil 工具类
import com.workface.fullymechanizemine.core.tool.utils.SpringUtil;
import org.eclipse.paho.client.mqttv3.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MqttUtil
private static final Logger log = LoggerFactory.getLogger(MqttUtils.class);
public MqttUtils()
public static IMqttClient getClient()
IMqttClient client = (IMqttClient) SpringUtil.getBean(IMqttClient.class);
MqttConnectOptions options = (MqttConnectOptions)SpringUtil.getBean(MqttConnectOptions.class);
if (!client.isConnected())
log.info("client:" + client.getClientId() + "未连接,初始化连接");
try
client.connect(options);
catch (MqttException var3)
throw new RuntimeException("mqtt客户端连接失败", var3);
return client;
public static boolean publish(String topic, String message)
try
getClient().publish(topic, new MqttMessage(message.getBytes()));
return true;
catch (MqttException var3)
log.error("mqtt-message 发送失败", var3);
return false;
public static boolean subscribe(String topic, IMqttMessageListener listener)
try
getClient().subscribe(topic, listener);
return true;
catch (MqttException var3)
log.error("客户端订阅0失败", topic);
return false;
SpringUtil 工具类
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.lang.Nullable;
public class SpringUtil implements ApplicationContextAware
private static final Logger log = LoggerFactory.getLogger(SpringUtil.class);
private static ConfigurableApplicationContext context;
public static DefaultListableBeanFactory getBeanFactory()
return (DefaultListableBeanFactory)context.getBeanFactory();
public static <T> T getBean(Class<T> clazz)
return clazz == null ? null : context.getBean(clazz);
public static <T> T getBean(String beanId)
return beanId == null ? null : (T) context.getBean(beanId);
public static <T> T getBean(String beanName, Class<T> clazz)
if (null != beanName && !"".equals(beanName.trim()))
return clazz == null ? null : context.getBean(beanName, clazz);
else
return null;
public static ApplicationContext getContext()
return context == null ? null : context;
public static void publishEvent(ApplicationEvent event)
if (context != null)
try
context.publishEvent(event);
catch (Exception var2)
log.error(var2.getMessage());
public static void registerBeanDefinition(String beanName, BeanDefinition definition)
getBeanFactory().registerBeanDefinition(beanName, definition);
public void setApplicationContext(@Nullable ApplicationContext context) throws BeansException
SpringUtil.context = (ConfigurableApplicationContext)context;
public static String getProperty(String prop)
return context.getEnvironment().getProperty(prop);
以上是关于Sprngboot 整合 mqtt 完整代码示例的主要内容,如果未能解决你的问题,请参考以下文章