Sprngboot 整合 mqtt 完整代码示例

Posted 洛阳泰山

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Sprngboot 整合 mqtt 完整代码示例相关的知识,希望对你有一定的参考价值。

pom文件引入依赖

        <!--mqtt-->
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.4</version>
        </dependency>

application.yml 加入配置

spring:
  application:
    name: test-porject
  mqtt:
    url: tcp://$MQTT_HOST:172.16.10.201:1883
    client-id: $spring.application.name
    topic:
      - $MQTT_TOPIC:/iot/#

配置类


import com.workface.fullymechanizemine.common.listener.MqttSubscribeListener;
import com.workface.fullymechanizemine.core.tool.utils.StringUtil;
import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
@EnableConfigurationProperties(MqttProperties.class)
public class MqttConfiguration 
    private static final Logger log = LoggerFactory.getLogger(MqttConfiguration.class);
    @Autowired
    private MqttProperties mqttProperties;

    public MqttConfiguration() 
    

    @Bean
    public MqttConnectOptions mqttConnectOptions() 
        MqttConnectOptions connectOptions = new MqttConnectOptions();
        connectOptions.setServerURIs(new String[]this.mqttProperties.getUrl());
        if (StringUtil.isNotBlank(this.mqttProperties.getUrl())) 
            connectOptions.setUserName(this.mqttProperties.getUsername());
        

        if (StringUtil.isNotBlank(this.mqttProperties.getPassword())) 
            connectOptions.setPassword(this.mqttProperties.getPassword().toCharArray());
        

        connectOptions.setKeepAliveInterval(60);
        return connectOptions;
    

    @Bean
    public IMqttClient mqttClient(MqttConnectOptions options) throws MqttException 
        IMqttClient mqttClient = new MqttClient(this.mqttProperties.getUrl(), this.mqttProperties.getClientId());
        mqttClient.connect(options);

        for(int x = 0; x < this.mqttProperties.getTopic().length; ++x) 
            mqttClient.subscribe(this.mqttProperties.getTopic()[x], new MqttSubscribeListener());
        

        return mqttClient;
    

MqttProperties属性类

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;

@Data
@ConfigurationProperties("spring.mqtt")
public class MqttProperties 
    private String url;
    private String clientId;
    private String username;
    private String password;
    private String[] topic;

MqttSubscribeListener 订阅监听类

import com.workface.fullymechanizemine.common.event.MqttSubscribeEvent;
import com.workface.fullymechanizemine.core.tool.utils.SpringUtil;
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttMessage;


public class MqttSubscribeListener implements IMqttMessageListener 

    @Override
    public void messageArrived(String s, MqttMessage mqttMessage) 
        String content = new String(mqttMessage.getPayload());
        MqttSubscribeEvent event = new MqttSubscribeEvent(s, content);
        SpringUtil.publishEvent(event);

    

MqttEventListener 事件监听类

import com.baomidou.mybatisplus.core.toolkit.StringPool;
import com.workface.fullymechanizemine.common.config.mqtt.MqttProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Configuration;

import javax.annotation.Resource;
import java.util.Arrays;
import java.util.List;


@Configuration
public class MqttEventListener 

    private static final Logger log = LoggerFactory.getLogger(MqttEventListener.class);

    @Resource
    private MqttProperties mqttProperties;

    private String processTopic (String topic) 
        List<String> topics = Arrays.asList(mqttProperties.getTopic());
        for (String wild : topics) 
            wild = wild.replace(StringPool.HASH, StringPool.EMPTY);
            if (topic.startsWith(wild)) 
                return topic.replace(wild, StringPool.EMPTY);
            
        
        return StringPool.EMPTY;
    

//    private static List<AcceptPointDTO> toPoints (Object source) 
//        String str = Objects.toString(source);
//        List<AcceptPointDTO> data = JSONArray.parseArray(str, AcceptPointDTO.class);
//        return data;
//    

//    @Async
//    @EventListener(MqttSubscribeEvent.class)
//    public void listen (MqttSubscribeEvent event) 
//        String topic = processTopic(event.getTopic());
//        Object source = event.getSource();
//        List<AcceptPointDTO> data = toPoints(source);
//        if (Func.isEmpty(data)) 
//            return;
//        
//        ConcurrentHashMap<String, WebSocketService> webSocketMap = WebSocketService.getWebSocketMap();
//        if (!Func.isEmpty(webSocketMap) && webSocketMap.size()>=1)
//            for (Map.Entry<String, WebSocketService> entry : webSocketMap.entrySet()) 
//                WebSocketService webSocketService = entry.getValue();
//                try 
//                    webSocketService.sendMessage(JSONObject.toJSONString(data));
//                 catch (IOException e) 
//                    e.printStackTrace();
//                
//            
//        
//        I_POINT_VALUE_SERVICE.savePointValue(data);
//    

MqttUtil 工具类

import com.workface.fullymechanizemine.core.tool.utils.SpringUtil;
import org.eclipse.paho.client.mqttv3.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MqttUtil 
    private static final Logger log = LoggerFactory.getLogger(MqttUtils.class);
    public MqttUtils() 
    

    public static IMqttClient getClient() 
        IMqttClient client = (IMqttClient) SpringUtil.getBean(IMqttClient.class);
        MqttConnectOptions options = (MqttConnectOptions)SpringUtil.getBean(MqttConnectOptions.class);
        if (!client.isConnected()) 
            log.info("client:" + client.getClientId() + "未连接,初始化连接");

            try 
                client.connect(options);
             catch (MqttException var3) 
                throw new RuntimeException("mqtt客户端连接失败", var3);
            
        

        return client;
    

    public static boolean publish(String topic, String message) 
        try 
            getClient().publish(topic, new MqttMessage(message.getBytes()));
            return true;
         catch (MqttException var3) 
            log.error("mqtt-message 发送失败", var3);
            return false;
        
    

    public static boolean subscribe(String topic, IMqttMessageListener listener) 
        try 
            getClient().subscribe(topic, listener);
            return true;
         catch (MqttException var3) 
            log.error("客户端订阅0失败", topic);
            return false;
        
    

SpringUtil 工具类

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.lang.Nullable;


public class SpringUtil implements ApplicationContextAware 
    private static final Logger log = LoggerFactory.getLogger(SpringUtil.class);
    private static ConfigurableApplicationContext context;

    public static DefaultListableBeanFactory getBeanFactory() 
        return (DefaultListableBeanFactory)context.getBeanFactory();
    

    public static <T> T getBean(Class<T> clazz) 
        return clazz == null ? null : context.getBean(clazz);
    

    public static <T> T getBean(String beanId) 
        return beanId == null ? null : (T) context.getBean(beanId);
    

    public static <T> T getBean(String beanName, Class<T> clazz) 
        if (null != beanName && !"".equals(beanName.trim())) 
            return clazz == null ? null : context.getBean(beanName, clazz);
         else 
            return null;
        
    

    public static ApplicationContext getContext() 
        return context == null ? null : context;
    

    public static void publishEvent(ApplicationEvent event) 
        if (context != null) 
            try 
                context.publishEvent(event);
             catch (Exception var2) 
                log.error(var2.getMessage());
            

        
    

    public static void registerBeanDefinition(String beanName, BeanDefinition definition) 
        getBeanFactory().registerBeanDefinition(beanName, definition);
    

    public void setApplicationContext(@Nullable ApplicationContext context) throws BeansException 
        SpringUtil.context = (ConfigurableApplicationContext)context;
    

    public static String getProperty(String prop) 
        return context.getEnvironment().getProperty(prop);
    

以上是关于Sprngboot 整合 mqtt 完整代码示例的主要内容,如果未能解决你的问题,请参考以下文章

Springboot 整合mqtt服务完整代码示例

SprngBoot整合Oracle报错:ORA-00911: 无效字符

使用sprintboot 优雅的整合 mqtt

Springboot整合mqtt客户端实现发送与接收消息

Android MQTT管理类完整代码

Android MQTT管理类完整代码