解决 EMQX 4.3 规则引擎获取消息中文乱码
Posted Calvin Chan
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了解决 EMQX 4.3 规则引擎获取消息中文乱码相关的知识,希望对你有一定的参考价值。
解决方案
一、EMQX 规则引擎配置
该规则引擎为获取指定主题的所有信息,并转发到指定接口。如果未按上述解决方案设置编码,此处 select 获取到的中文为乱码。
二、订阅端代码
1、订阅端代码
package com.mqtt.client;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.UUID;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MyMqttClient
/**
* 服务器ip地址
*/
public static final String MQTT_BROKER_HOST = "tcp://localhost:1883";
/**
* 客户端唯一标识
*/
public static final String MQTT_CLIENT_ID = UUID.randomUUID().toString().substring(0, 8).toUpperCase();
/**
*
*/
public static final String USERNAME = "123456";
/**
* 密码
*/
public static final String PASSWORD = "123456";
/**
* 主题
*/
public static final String TOPIC_FILTER = "/wxhntmy/mqtt/#";
private volatile static MqttClient mqttClient;
private static MqttConnectOptions options;
public static void main(String[] args)
// TODO 自动生成的方法存根
try
// host为主机名,clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示,
// MemoryPersistence设置clientid的保存形式,默认为以内存保存
mqttClient = new MqttClient(MQTT_BROKER_HOST, MQTT_CLIENT_ID, new MemoryPersistence());
// 配置参数信息
options = new MqttConnectOptions();
// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,
// 这里设置为true表示每次连接到服务器都以新的身份连接
options.setCleanSession(true);
// 设置用户名
options.setUserName(USERNAME);
// 设置密码
options.setPassword(PASSWORD.toCharArray());
// 设置超时时间 单位为秒
options.setConnectionTimeout(10);
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
options.setKeepAliveInterval(20);
// 连接
mqttClient.connect(options);
// 订阅
mqttClient.subscribe(TOPIC_FILTER);
// 设置回调
mqttClient.setCallback(new MyMqttCallback());
Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String datef = sdf.format(date);
System.out.println("[" + datef + "] MQTT_Client Started!");
System.out.println("[" + datef + "] MQTT_Client Host: " + MQTT_BROKER_HOST);
System.out.println("[" + datef + "] MQTT_Client ID: " + MQTT_CLIENT_ID);
System.out.println("[" + datef + "] MQTT_Client Topic_Filter: " + TOPIC_FILTER);
catch (Exception e)
e.printStackTrace();
2、订阅端回调代码
package com.mqtt.client;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
/**
* MQTT 客户端回调
*
* @author wxhntmy
*
*/
public class MyMqttCallback implements MqttCallback
/**
* 连接丢失
*/
@Override
public void connectionLost(Throwable arg0)
// TODO 自动生成的方法存根
Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String datef = sdf.format(date);
System.out.println("[" + datef + "] Connection Lost!");
/**
* 交付完成
*/
@Override
public void deliveryComplete(IMqttDeliveryToken arg0)
// TODO 自动生成的方法存根
Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String datef = sdf.format(date);
System.out.println("[" + datef + "] Delivery Complete!");
/**
* 消息到达客户端
*/
@Override
public void messageArrived(String arg0, MqttMessage arg1) throws Exception
// TODO 自动生成的方法存根
Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String datef = sdf.format(date);
//必须设置获取消息的字符编码,否则获取到的是乱码
String msgString = new String(arg1.getPayload(), "UTF-8");
System.out.println(
"[" + datef + "] Topic: " + arg0 + " Qos: " + arg1.getQos() + " Message: " + msgString);
三、发布端代码
package com.mqtt.publish;
import java.io.UnsupportedEncodingException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.UUID;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
/**
* 发布端
*/
public class PublishSample
public static void main(String[] args) throws UnsupportedEncodingException
// TODO 自动生成的方法存根
// 主题
String topic = "/wxhntmy/mqtt";
// 内容
String content = "hello 哈哈";
int qos = 1;
String broker = "tcp://localhost:1883";
String userName = "123456";
String password = "123456";
String clientId = String.valueOf(System.currentTimeMillis());
// 内存存储
MemoryPersistence persistence = new MemoryPersistence();
Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String datef = sdf.format(date);
System.out.println("[" + datef + "] MQTT_Publish Started!");
System.out.println("[" + datef + "] MQTT_Publish Broker: " + broker);
System.out.println("[" + datef + "] MQTT_Publish ClientID: " + clientId);
System.out.println("[" + datef + "] MQTT_Publish Topic: " + topic);
System.out.println("[" + datef + "] MQTT_Publish Content: " + content);
// 创建客户端
MqttClient sampleClient = null;
try
sampleClient = new MqttClient(broker, clientId, persistence);
catch (MqttException e1)
// TODO 自动生成的 catch 块
e1.printStackTrace();
// 创建链接参数
MqttConnectOptions connOpts = new MqttConnectOptions();
try
// 在重新启动和重新连接时记住状态
connOpts.setCleanSession(false);
// 设置连接的用户名
connOpts.setUserName(userName);
connOpts.setPassword(password.toCharArray());
connOpts.setKeepAliveInterval(20);
// 建立连接
sampleClient.connect(connOpts);
// 创建消息
//规则引擎获取到的中文乱码原因在于这里,没有设置获取Bytes的字符编码
MqttMessage message = new MqttMessage(content.getBytes("UTF-8"));
// 设置消息的服务质量
message.setQos(qos);
String tp = topic + "/" + String.valueOf(System.currentTimeMillis());
while(true)
// 发布消息
sampleClient.publish(tp, message);
date = new Date();
sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
datef = sdf.format(date);
System.out.println("[" + datef + "] topic " + tp);
System.out.println("[" + datef + "] message " + message);
try
Thread.sleep(10 * 1000);
catch (InterruptedException e)
// TODO 自动生成的 catch 块
e.printStackTrace();
catch (MqttException me)
try
// 断开连接
sampleClient.disconnect();
// 关闭客户端
sampleClient.close();
catch (MqttException e)
// TODO 自动生成的 catch 块
e.printStackTrace();
date = new Date();
sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
datef = sdf.format(date);
System.out.println("[" + datef + "] reason " + me.getReasonCode());
System.out.println("[" + datef + "] msg " + me.getMessage());
System.out.println("[" + datef + "] loc " + me.getLocalizedMessage());
System.out.println("[" + datef + "] cause " + me.getCause());
System.out.println("[" + datef + "] excep " + me);
me.printStackTrace();
以上是关于解决 EMQX 4.3 规则引擎获取消息中文乱码的主要内容,如果未能解决你的问题,请参考以下文章
EMQ X 规则引擎系列 (八)桥接消息到 MQTT Broker
EMQX Cloud更新:数据集成新增 HStreamDB & Tablestore
EMQ X Enterprise 新功能 Rule Engine 介绍
EMQX v4.4.5 发布:新增排他订阅及 MQTT 5.0 发布属性支持
NanoMQ Newsletter 2022-06|规则引擎正式发布 & NanoSDK 支持 MQTT over QUIC