MQTT(EMQX)

Posted VipSoft

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MQTT(EMQX)相关的知识,希望对你有一定的参考价值。

POM

<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.2</version>
</dependency>

Service.java

package com.vipsoft.mqtt;


import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttMessage;

import java.util.Scanner;

public class Service 

    public static void main(String[] args) throws Exception 
        String host = "tcp://172.16.3.88:1883";
        String topic = "VipSoft_MQTT";
        String clientId = "server_id"; // clientId不能重复这个是server的id
        //新建mqtt连接
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(true);
        //新建mqtt客户端
        MqttClient client = new MqttClient(host, clientId);
        client.connect(options);
        //新建mqtt消息
        MqttMessage message = new MqttMessage();

        @SuppressWarnings("resource")
        Scanner scanner = new Scanner(System.in);
        System.out.println("请输入要发送的内容:");
        while (true) 
            String MsgMessage= scanner.nextLine();
            message.setPayload(MsgMessage.getBytes());
            client.publish(topic, message);
        

    

Client.java

package com.vipsoft.mqtt;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;

public class Client 

    public static void main(String[] args) throws Exception 
        String host = "tcp://172.16.3.88:1883";
        String topic = "VipSoft_MQTT";
        String clientId = "client_id";
        // 1.设置mqtt连接属性
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(true);
        // 2.实例化mqtt客户端
        MqttClient client = new MqttClient(host, clientId);
        // 3.连接
        client.connect(options);
        //这里的setCallback需要新建一个Callback类并实现 MqttCallback 这个类
        client.setCallback(new PushCallback());
        while (true) 
            client.subscribe(topic, 2);
         
    


PushCallback.java

package com.vipsoft.mqtt;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Date;

/**
 * 发布消息的回调类
 *
 * 必须实现MqttCallback的接口并实现对应的相关接口方法CallBack 类将实现 MqttCallBack。
 * 每个客户机标识都需要一个回调实例。在此示例中,构造函数传递客户机标识以另存为实例数据。
 * 在回调中,将它用来标识已经启动了该回调的哪个实例。
 * 必须在回调类中实现三个方法:
 *
 *  public void messageArrived(MqttTopic topic, MqttMessage message)接收已经预订的发布。
 *
 *  public void connectionLost(Throwable cause)在断开连接时调用。
 *
 *  public void deliveryComplete(MqttDeliveryToken token))
 *  接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用。
 *  由 MqttClient.connect 激活此回调。
 *
 */
public class PushCallback implements MqttCallback 
    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    public void connectionLost(Throwable cause) 
        // 连接丢失后进行重连
        System.out.println("连接断开,可以做重连");
        logger.info("掉线时间:", new Date());
    

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) 
        System.out.println("deliveryComplete---------" + token.isComplete());
    

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception 
        // subscribe后得到的消息会执行到这里面
        // System.out.println(message);
        System.out.println("接收消息主题 : " + topic);
        System.out.println("接收消息Qos : " + message.getQos());
        System.out.println("接收消息内容 : " + new String(message.getPayload()));
    


以上是关于MQTT(EMQX)的主要内容,如果未能解决你的问题,请参考以下文章

高度可扩展,EMQX 5.0 达成 1 亿 MQTT 连接

MQTT消息框架paho-mqtt与emqx安装部署与启动,python

mqtt开源服务器 EMQX 使用指南

如何使用 NodeJS mqtt、emqx 订阅所有主题/消息

如何在 Ubuntu 上安装 EMQX MQTT 服务器

MQTT(EMQX)