解决 EMQX 4.3 规则引擎获取消息中文乱码

Posted Calvin Chan

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了解决 EMQX 4.3 规则引擎获取消息中文乱码相关的知识,希望对你有一定的参考价值。

解决方案


一、EMQX 规则引擎配置

该规则引擎为获取指定主题的所有信息,并转发到指定接口。如果未按上述解决方案设置编码,此处 select 获取到的中文为乱码。

二、订阅端代码

1、订阅端代码

package com.mqtt.client;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.UUID;

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class MyMqttClient 
	/**
	 * 服务器ip地址
	 */
	public static final String MQTT_BROKER_HOST = "tcp://localhost:1883";

	/**
	 * 客户端唯一标识
	 */
	public static final String MQTT_CLIENT_ID = UUID.randomUUID().toString().substring(0, 8).toUpperCase();

	/**
	 *
	 */
	public static final String USERNAME = "123456";
	/**
	 * 密码
	 */
	public static final String PASSWORD = "123456";

	/**
	 * 主题
	 */
	public static final String TOPIC_FILTER = "/wxhntmy/mqtt/#";

	private volatile static MqttClient mqttClient;
	private static MqttConnectOptions options;

	public static void main(String[] args) 
		// TODO 自动生成的方法存根
		try 
			// host为主机名,clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示,
			// MemoryPersistence设置clientid的保存形式,默认为以内存保存

			mqttClient = new MqttClient(MQTT_BROKER_HOST, MQTT_CLIENT_ID, new MemoryPersistence());
			// 配置参数信息
			options = new MqttConnectOptions();
			// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,
			// 这里设置为true表示每次连接到服务器都以新的身份连接
			options.setCleanSession(true);
			// 设置用户名
			options.setUserName(USERNAME);
			// 设置密码
			options.setPassword(PASSWORD.toCharArray());
			// 设置超时时间 单位为秒
			options.setConnectionTimeout(10);
			// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
			options.setKeepAliveInterval(20);
			// 连接
			mqttClient.connect(options);
			// 订阅
			mqttClient.subscribe(TOPIC_FILTER);
			// 设置回调
			mqttClient.setCallback(new MyMqttCallback());

			Date date = new Date();
			SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
			String datef = sdf.format(date);
			System.out.println("[" + datef + "] MQTT_Client Started!");
			System.out.println("[" + datef + "] MQTT_Client Host: " + MQTT_BROKER_HOST);
			System.out.println("[" + datef + "] MQTT_Client ID: " + MQTT_CLIENT_ID);
			System.out.println("[" + datef + "] MQTT_Client Topic_Filter: " + TOPIC_FILTER);
		 catch (Exception e) 
			e.printStackTrace();
		
	



2、订阅端回调代码

package com.mqtt.client;

import java.text.SimpleDateFormat;
import java.util.Date;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;

/**
 * MQTT 客户端回调
 * 
 * @author wxhntmy
 *
 */
public class MyMqttCallback implements MqttCallback 

	/**
	 * 连接丢失
	 */
	@Override
	public void connectionLost(Throwable arg0) 
		// TODO 自动生成的方法存根
		Date date = new Date();
		SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
		String datef = sdf.format(date);
		System.out.println("[" + datef + "] Connection Lost!");
	

	/**
	 * 交付完成
	 */
	@Override
	public void deliveryComplete(IMqttDeliveryToken arg0) 
		// TODO 自动生成的方法存根
		Date date = new Date();
		SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
		String datef = sdf.format(date);
		System.out.println("[" + datef + "] Delivery Complete!");
	

	/**
	 * 消息到达客户端
	 */
	@Override
	public void messageArrived(String arg0, MqttMessage arg1) throws Exception 
		// TODO 自动生成的方法存根
		Date date = new Date();
		SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
		String datef = sdf.format(date);
		//必须设置获取消息的字符编码,否则获取到的是乱码
		String msgString = new String(arg1.getPayload(), "UTF-8");
		System.out.println(
				"[" + datef + "] Topic: " + arg0 + "  Qos: " + arg1.getQos() + "  Message: " + msgString);
	



三、发布端代码

package com.mqtt.publish;

import java.io.UnsupportedEncodingException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.UUID;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

/**
 * 发布端
 */
public class PublishSample 

	public static void main(String[] args) throws UnsupportedEncodingException 
		// TODO 自动生成的方法存根
		// 主题
		String topic = "/wxhntmy/mqtt";
		// 内容
		String content = "hello 哈哈";
		int qos = 1;
		String broker = "tcp://localhost:1883";
		String userName = "123456";
		String password = "123456";
		String clientId = String.valueOf(System.currentTimeMillis());
		// 内存存储
		MemoryPersistence persistence = new MemoryPersistence();

		Date date = new Date();
		SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
		String datef = sdf.format(date);
		System.out.println("[" + datef + "] MQTT_Publish Started!");
		System.out.println("[" + datef + "] MQTT_Publish Broker: " + broker);
		System.out.println("[" + datef + "] MQTT_Publish ClientID: " + clientId);
		System.out.println("[" + datef + "] MQTT_Publish Topic: " + topic);
		System.out.println("[" + datef + "] MQTT_Publish Content: " + content);

		// 创建客户端
		MqttClient sampleClient = null;
		try 
			sampleClient = new MqttClient(broker, clientId, persistence);
		 catch (MqttException e1) 
			// TODO 自动生成的 catch 块
			e1.printStackTrace();
		
		// 创建链接参数
		MqttConnectOptions connOpts = new MqttConnectOptions();
		
		try 
			
			// 在重新启动和重新连接时记住状态
			connOpts.setCleanSession(false);
			// 设置连接的用户名
			connOpts.setUserName(userName);
			connOpts.setPassword(password.toCharArray());
			connOpts.setKeepAliveInterval(20);
			// 建立连接
			sampleClient.connect(connOpts);
			// 创建消息
			//规则引擎获取到的中文乱码原因在于这里,没有设置获取Bytes的字符编码
			MqttMessage message = new MqttMessage(content.getBytes("UTF-8"));
			// 设置消息的服务质量
			message.setQos(qos);
			
			String tp = topic + "/" + String.valueOf(System.currentTimeMillis());
			
			while(true) 
				// 发布消息
				
				sampleClient.publish(tp, message);
				
				date = new Date();
				sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
				datef = sdf.format(date);
				System.out.println("[" + datef + "] topic " + tp);
				System.out.println("[" + datef + "] message " + message);
				
				try 
					Thread.sleep(10 * 1000);
				 catch (InterruptedException e) 
					// TODO 自动生成的 catch 块
					e.printStackTrace();
				
			
			
			
		 catch (MqttException me) 
			
			try 
				// 断开连接
				sampleClient.disconnect();
				// 关闭客户端
				sampleClient.close();
			 catch (MqttException e) 
				// TODO 自动生成的 catch 块
				e.printStackTrace();
			
			
			date = new Date();
			sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
			datef = sdf.format(date);
			System.out.println("[" + datef + "] reason " + me.getReasonCode());
			System.out.println("[" + datef + "] msg " + me.getMessage());
			System.out.println("[" + datef + "] loc " + me.getLocalizedMessage());
			System.out.println("[" + datef + "] cause " + me.getCause());
			System.out.println("[" + datef + "] excep " + me);
			me.printStackTrace();
		
	



以上是关于解决 EMQX 4.3 规则引擎获取消息中文乱码的主要内容,如果未能解决你的问题,请参考以下文章

EMQ X 规则引擎系列 (八)桥接消息到 MQTT Broker

EMQX Cloud更新:数据集成新增 HStreamDB & Tablestore

EMQ X Enterprise 新功能 Rule Engine 介绍

EMQX v4.4.5 发布:新增排他订阅及 MQTT 5.0 发布属性支持

NanoMQ Newsletter 2022-06|规则引擎正式发布 & NanoSDK 支持 MQTT over QUIC

Flink规则引擎实践分享