springboot结合mqtt服务器构建消息生产与消费示例

Posted luffy5459

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了springboot结合mqtt服务器构建消息生产与消费示例相关的知识,希望对你有一定的参考价值。

    mqtt服务,是一个消息中间件,目前在物联网领域使用非常广泛。

    mqtt服务器的搭建,也有很多方式,最直接的就是使用docker启动emqx镜像。

docker run -d --name emqx -p 18083:18083 -p 1883:1883 emqx

    通过url访问webui。http://192.168.56.100:18083 用户名密码:admin/public

mqtt与springboot结合 

    引入maven依赖

<parent>
  	<groupId>org.springframework.boot</groupId>
  	<artifactId>spring-boot-starter-parent</artifactId>
  	<version>2.4.3</version>
  </parent>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <scope>test</scope>
    </dependency>
    
    <dependency>
    	<groupId>org.springframework.boot</groupId>
    	<artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    
    <dependency>
    	<groupId>org.springframework.boot</groupId>
    	<artifactId>spring-boot-starter-integration</artifactId>
    </dependency>
    
    <dependency>
    	<groupId>org.springframework.integration</groupId>
    	<artifactId>spring-integration-stream</artifactId>
    </dependency>
    
    <dependency>
    	<groupId>org.springframework.integration</groupId>
    	<artifactId>spring-integration-mqtt</artifactId>
    </dependency>
    
    <dependency>
    	<groupId>org.eclipse.paho</groupId>
    	<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    	<version>1.2.5</version>
    </dependency>
    
  </dependencies>

    设置配置文件application.yml

mqtt:
  serverURIs: tcp://192.168.226.100:1883
  username: admin
  password: public
  client:
    id: ${random.value}
  topic: topic_default

    设置配置类 MqttConfig.java

package com.xxx.mqttapp.config;
import java.util.Objects;
import javax.annotation.PostConstruct;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
@IntegrationComponentScan
@Configuration
public class MqttConfig {
	public static final Logger log = LoggerFactory.getLogger(MqttConfig.class);
	public static final String OUTBOUND_CHANNEL = "mqttOutboundChannel";

    public static final String INPUT_CHANNEL = "mqttInputChannel";

    @Value("${mqtt.username}")
    private String username;

    @Value("${mqtt.password}")
    private String password;

    @Value("${mqtt.serverURIs}")
    private String hostUrl;

    @Value("${mqtt.client.id}")
    private String clientId;

    @Value("${mqtt.topic}")
    private String defaultTopic;

    @PostConstruct
    public void init() {
        log.debug("username:{} password:{} hostUrl:{} clientId :{} ",
                this.username, this.password, this.hostUrl, this.clientId, this.defaultTopic);
    }

    @Bean
    public MqttPahoClientFactory clientFactory() {

        final MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[]{hostUrl});
        options.setUserName(username);
        options.setPassword(password.toCharArray());
        final DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(options);
        return factory;
    }

    @Bean(value = OUTBOUND_CHANNEL)
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    @Bean
    @ServiceActivator(inputChannel = OUTBOUND_CHANNEL)
    public MessageHandler mqttOutbound() {

        final MqttPahoMessageHandler handler = new MqttPahoMessageHandler(clientId, clientFactory());
        handler.setDefaultQos(1);
        handler.setDefaultRetained(false);
        handler.setDefaultTopic(defaultTopic);
        handler.setAsync(false);
        handler.setAsyncEvents(false);
        return handler;
    }

    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(
                        clientId + "_inbound", clientFactory(), defaultTopic);
        adapter.setCompletionTimeout(3000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = INPUT_CHANNEL)
    public MessageHandler handler() {
        return message -> {
            String topic = Objects.requireNonNull(message.getHeaders().get("mqtt_receivedTopic")).toString();
            log.info("topic: {},payload : {}", topic,message.getPayload().toString());
        };
    }
}

    添加发送接口MqttGateway.java

package com.xxx.mqttapp.service;

import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;

import com.xxx.mqttapp.config.MqttConfig;

@MessagingGateway(defaultRequestChannel = MqttConfig.OUTBOUND_CHANNEL)
public interface MqttGateway {
	public void sendToMqtt(@Header(MqttHeaders.TOPIC)String topic,String payload);
}

    定义发送控制器

package com.xxx.mqttapp.web;
import java.util.HashMap;
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.xxx.mqttapp.service.MqttGateway;
@RequestMapping("/test")
@RestController
public class TestController {
	@Autowired
	private MqttGateway mqttGateway;
	
	@PostMapping("/sendMessage")
	public Map<String, Object> sendMessage(String topic,String payload){
		Map<String, Object> result = new HashMap<String, Object>();
		mqttGateway.sendToMqtt(topic, payload);
		result.put("topic", topic);
		result.put("payload", payload);
		return result;
	}
	
}

    springboot启动类

package com.xxx.mqttapp;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class App {
    public static void main( String[] args ){
    	SpringApplication.run(App.class, args);
    }
}

    启动springboot应用,测试:

    消息发送成功,控制台打印消息如下:

    这个测试,不是直接发送消息到mqtt服务器,而是借助web接口,发送http请求,后台接收,然后调用生产者发送方法。

    消费者直接配置了一个MqttPahoMessageDrivenChannelAdapter,并订阅defaultTopic主题,他需要设置一个channel和一个handler。所以,生产者一发送消息过来,消费者立马就消费掉了。

以上是关于springboot结合mqtt服务器构建消息生产与消费示例的主要内容,如果未能解决你的问题,请参考以下文章

springboot整合RabbitMQ Mqtt

Springboot整合mqtt客户端实现发送与接收消息

Spring Boot MQTT Too many publishes in progress错误的解决方案

如何从 MQTT 生产并在 ActiveMQ 中作为 MQTT 和 JMS 消费

rabbitMQ应用,laravel生产广播消息,springboot消费消息

SpringBoot2.x集成MQTT实现消息订阅(附源码)