Rocketmq-Mqtt 开发实例

Posted Doker 多克

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Rocketmq-Mqtt 开发实例相关的知识,希望对你有一定的参考价值。

Doker官网:Doker 多克

一、RocketMQ MQTT 概览

传统的消息队列MQ主要应用于服务(端)之间的消息通信,比如电商领域的交易消息、支付消息、物流消息等等。然而在消息这个大类下,还有一个非常重要且常见的消息领域,即IoT类终端设备消息。近些年,我们看到随着智能家居、工业互联而兴起的面向IoT设备类的消息正在呈爆炸式增长,而且已经发展十余年的移动互联网的手机APP端消息仍然是数量级庞大。面向终端设备的消息数量级比传统服务端的消息要大很多量级并仍然在快速增长。

如果可以有一个统一的消息系统(产品)来提供多场景计算(如stream、event)、多场景(IoT、APP)接入,其实是非常有价值的,因为消息也是一种重要数据,数据如果只存在一个系统内,可以最大地降低存储成本,同时可以有效地避免数据因在不同系统间同步带来的一致性难题和挑战。

基于此,我们引入了RocketMQ-MQTT这个扩展项目来实现RocketMQ统一接入IoT设备和服务端的消息,提供一体化消息存储和互通能力。

1、MQTT协议

在IoT终端场景,目前业界广泛使用的是MQTT协议,是起源于物联网IoT场景,OASIS联盟定义的标准的开放式协议。因为IoT设备种类繁多,运行环境各异,一个标准的接入协议尤为关键。

MQTT协议定义的是一个Pub/Sub的通信模型,这个与RocketMQ是类似的,不过其在订阅方式上比较灵活,可以支持多级Topic订阅(如 “/t/t1/t2”),甚至可以支持通配符订阅(如 “/t/t1/+”)。

2、模型介绍

队列存储模型

我们设计了一种多维度分发的Topic队列模型,如上图所示,消息可以来自各个接入场景(如服务端的MQ/AMQP、客户端的MQTT),但只会写一份存到commitlog里面,然后分发出多个需求场景的队列索引(ConsumerQueue),如服务端场景(MQ/AMQP)可以按照一级Topic队列进行传统的服务端消费,客户端MQTT场景可以按照MQTT多级Topic以及通配符订阅进行消费消息。

这样的一个队列模型就可以同时支持服务端和终端场景的接入和消息收发,达到一体化的目标。

推拉模型

上图展示的是一个推拉模型,图中的P节点是一个协议网关或broker插件,终端设备通过MQTT协议连到这个网关节点。消息可以来自多种场景(MQ/AMQP/MQTT)发送过来,存到Topic队列后会有一个notify逻辑模块来实时感知这个新消息到达,然后会生成消息事件(就是消息的Topic名称),将该事件推送至网关节点,网关节点根据其连上的终端设备订阅情况进行内部匹配,找到哪些终端设备能匹配上,然后会触发pull请求去存储层读取消息再推送至终端设备。

3、架构概览

我们的目标是期望基于RocketMQ实现一体化且自闭环,但不希望Broker被侵入更多场景逻辑,我们抽象了一个协议计算层,这个计算层可以是一个网关,也可以是一个broker插件。Broker专注解决Queue的事情以及为了满足上面的计算需求做一些Queue存储的适配或改造。协议计算层负责协议接入,并且要可插拔部署。

本实例分四部分:

  • MqttConsumer.java // MQTT客户端启动订阅消息

  • MqttProducer.java // MQTT客户端启动发布消息

  • RocketMQConsumer.java //RocketMQ客户端启动订阅消息

  • RocketMQProducer.java // RocketMQ客户端启动发布消息

二、引入依赖

<dependencies>
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>mqtt-common</artifactId>
        </dependency>
    </dependencies>

三、Mqtt消息生产者MqttProducer:

import org.apache.rocketmq.mqtt.common.util.HmacSHA1Util;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import java.nio.charset.StandardCharsets;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.text.SimpleDateFormat;
import java.util.Date;

public class MqttProducer 
    public static void main(String[] args) throws InterruptedException, MqttException, NoSuchAlgorithmException, InvalidKeyException 
        MemoryPersistence memoryPersistence = new MemoryPersistence();
        String brokerUrl = "tcp://" + System.getenv("host") + ":1883";
        String firstTopic = System.getenv("topic");
        String sendClientId = "send01";
        MqttConnectOptions mqttConnectOptions = buildMqttConnectOptions(sendClientId);
        MqttClient mqttClient = new MqttClient(brokerUrl, sendClientId, memoryPersistence);
        mqttClient.setTimeToWait(5000L);
        mqttClient.setCallback(new MqttCallbackExtended() 
            @Override
            public void connectComplete(boolean reconnect, String serverURI) 
                System.out.println(sendClientId + " connect success to " + serverURI);
            

            @Override
            public void connectionLost(Throwable throwable) 
                throwable.printStackTrace();
            

            @Override
            public void messageArrived(String topic, MqttMessage mqttMessage) 
            

            @Override
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) 
            
        );
        try 
            mqttClient.connect(mqttConnectOptions);
         catch (Exception e) 
            e.printStackTrace();
        
        long interval = 1000;
        for (int i = 0; i < 1000; i++) 
            String msg = "r1_" + System.currentTimeMillis() + "_" + i;
            MqttMessage message = new MqttMessage(msg.getBytes(StandardCharsets.UTF_8));
            message.setQos(1);
            String mqttSendTopic = firstTopic + "/r1";
            mqttClient.publish(mqttSendTopic, message);
            System.out.println(now() + "send: " + mqttSendTopic + ", " + msg);
            Thread.sleep(interval);

            mqttSendTopic = firstTopic + "/r/wc";
            msg = "wc_" + System.currentTimeMillis() + "_" + i;
            MqttMessage messageWild = new MqttMessage(msg.getBytes(StandardCharsets.UTF_8));
            messageWild.setQos(1);
            mqttClient.publish(mqttSendTopic, messageWild);
            System.out.println(now() + "send: " + mqttSendTopic + ", " + msg);
            Thread.sleep(interval);

            mqttSendTopic = firstTopic + "/r2";
            msg = "msgQ2_" + System.currentTimeMillis() + "_" + i;
            message = new MqttMessage(msg.getBytes(StandardCharsets.UTF_8));
            message.setQos(2);
            mqttClient.publish(mqttSendTopic, message);
            System.out.println(now() + "send: " + mqttSendTopic + ", " + msg);
            Thread.sleep(interval);
        
    

    private static MqttConnectOptions buildMqttConnectOptions(String clientId) throws NoSuchAlgorithmException, InvalidKeyException 
        MqttConnectOptions connOpts = new MqttConnectOptions();
        connOpts.setCleanSession(true);
        connOpts.setKeepAliveInterval(60);
        connOpts.setAutomaticReconnect(true);
        connOpts.setMaxInflight(10000);
        connOpts.setUserName(System.getenv("username"));
        connOpts.setPassword(HmacSHA1Util.macSignature(clientId, System.getenv("password")).toCharArray());
        return connOpts;
    

    private static String now() 
        SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");
        return sf.format(new Date()) + "\\t";
    

四、Mqtt消费者MqttConsumer

import org.apache.rocketmq.mqtt.common.util.HmacSHA1Util;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.text.SimpleDateFormat;
import java.util.Date;

public class MqttConsumer 
    public static void main(String[] args) throws MqttException, NoSuchAlgorithmException, InvalidKeyException 
        String brokerUrl = "tcp://" + System.getenv("host") + ":1883";
        String firstTopic = System.getenv("topic");
        MemoryPersistence memoryPersistence = new MemoryPersistence();
        String recvClientId = "recv01";
        MqttConnectOptions mqttConnectOptions = buildMqttConnectOptions(recvClientId);
        MqttClient mqttClient = new MqttClient(brokerUrl, recvClientId, memoryPersistence);
        mqttClient.setTimeToWait(5000L);
        mqttClient.setCallback(new MqttCallbackExtended() 
            @Override
            public void connectComplete(boolean reconnect, String serverURI) 
                System.out.println(recvClientId + " connect success to " + serverURI);
                try 
                    final String topicFilter[] = firstTopic + "/r1", firstTopic + "/r/+", firstTopic + "/r2";
                    final int[] qos = 1, 1, 2;
                    mqttClient.subscribe(topicFilter, qos);
                 catch (Exception e) 
                    e.printStackTrace();
                
            

            @Override
            public void connectionLost(Throwable throwable) 
                throwable.printStackTrace();
            

            @Override
            public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception 
                try 
                    String payload = new String(mqttMessage.getPayload());
                    String[] ss = payload.split("_");
                    System.out.println(now() + "receive:" + topic + "," + payload);
                 catch (Exception e) 
                    e.printStackTrace();
                
            

            @Override
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) 
            
        );

        try 
            mqttClient.connect(mqttConnectOptions);
         catch (Exception e) 
            e.printStackTrace();
            System.out.println("connect fail");
        
    

    private static MqttConnectOptions buildMqttConnectOptions(String clientId) throws NoSuchAlgorithmException, InvalidKeyException 
        MqttConnectOptions connOpts = new MqttConnectOptions();
        connOpts.setCleanSession(true);
        connOpts.setKeepAliveInterval(60);
        connOpts.setAutomaticReconnect(true);
        connOpts.setMaxInflight(10000);
        connOpts.setUserName(System.getenv("username"));
        connOpts.setPassword(HmacSHA1Util.macSignature(clientId, System.getenv("password")).toCharArray());
        return connOpts;
    

    private static String now() 
        SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");
        return sf.format(new Date()) + "\\t";
    

五、RocketMQ客户端启动发布消息RocketMQProducer

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.mqtt.common.util.TopicUtils;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;

public class RocketMQProducer 
    private static DefaultMQProducer producer;
    private static String firstTopic = System.getenv("firstTopic");
    private static String recvClientId = "recv01";

    public static void main(String[] args) throws Exception 
        //Instantiate with a producer group name.
        producer = new DefaultMQProducer("PID_TEST");
        // Specify name server addresses.
        producer.setNamesrvAddr(System.getenv("namesrv"));
        //Launch the instance.
        producer.start();

        for (int i = 0; i < 1000; i++) 
            //Create a message instance, specifying topic, tag and message body.

            //Call send message to deliver message to one of brokers.
            try 
                sendMessage(i);
                Thread.sleep(1000);
                sendWithWildcardMessage(i);
                Thread.sleep(1000);
             catch (Exception e) 
                e.printStackTrace();
            
        
        //Shut down once the producer instance is not longer in use.
        producer.shutdown();
    

    private static void setLmq(Message msg, Set<String> queues) 
        msg.putUserProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH,
                StringUtils.join(
                        queues.stream().map(s -> StringUtils.replace(s, "/", "%")).map(s -> MixAll.LMQ_PREFIX + s).collect(Collectors.toSet()),
                        MixAll.MULTI_DISPATCH_QUEUE_SPLITTER));
    

    private static void sendMessage(int i) throws MQBrokerException, RemotingException, InterruptedException, MQClientException 
        Message msg = new Message(firstTopic,
                "MQ2MQTT",
                ("MQ_" + System.currentTimeMillis() + "_" + i).getBytes(StandardCharsets.UTF_8));
        String secondTopic = "/r1";
        setLmq(msg, new HashSet<>(Arrays.asList(TopicUtils.wrapLmq(firstTopic, secondTopic))));
        SendResult sendResult = producer.send(msg);
        System.out.println(now() + "sendMessage: " + new String(msg.getBody()));
    

    private static void sendWithWildcardMessage(int i) throws MQBrokerException, RemotingException, InterruptedException, MQClientException 
        Message msg = new Message(firstTopic,
                "MQ2MQTT",
                ("MQwc_" + System.currentTimeMillis() + "_" + i).getBytes(StandardCharsets.UTF_8));
        String secondTopic = "/r/wc";
        Set<String> lmqSet = new HashSet<>();
        lmqSet.add(TopicUtils.wrapLmq(firstTopic, secondTopic));
        lmqSet.addAll(mapWildCardLmq(firstTopic, secondTopic));
        setLmq(msg, lmqSet);
        SendResult sendResult = producer.send(msg);
        System.out.println(now() + "sendWcMessage: " + new String(msg.getBody()));
    

    private static Set<String> mapWildCardLmq(String firstTopic, String secondTopic) 
        // todo by yourself
        return new HashSet<>(Arrays.asList(TopicUtils.wrapLmq(firstTopic, "/r/+")));
    

    private static String now() 
        SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");
        return sf.format(new Date()) + "\\t";
    

六、RocketMQ客户端启动订阅消息RocketMQConsumer

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.mqtt.common.model.Constants;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;

public class RocketMQConsumer 

    public static void main(String[] args) throws MQClientException 
        // Instantiate with specified consumer group name.
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GID_test01");

        // Specify name server addresses.
        consumer.setNamesrvAddr(System.getenv("namesrv"));

        // Subscribe one more more topics to consume.
        String firstTopic = System.getenv("firstTopic");
        consumer.subscribe(firstTopic, Constants.MQTT_TAG);
        // Register callback to execute on arrival of messages fetched from brokers.
        consumer.registerMessageListener(new MessageListenerConcurrently() 

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) 
                MessageExt messageExt = msgs.get(0);
                System.out.println(now() + "Receive: " + new String(messageExt.getBody()));
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            
        );

        //Launch the consumer instance.
        consumer.start();

        System.out.printf("Consumer Started.%n");
    

    private static String now() 
        SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");
        return sf.format(new Date()) + "\\t";
    

大家好,我是Doker品牌的Sinbad,欢迎点赞和评论,您的鼓励是我们持续更新的动力!欢迎加微信进入技术群聊!

DCloud开发

DCloud开发DCloud开发DCloud开发DCloud开发DCloud开发DCloud开发DCloud开发DCloud开发DCloud开发DCloud开发DCloud开发DCloud开发DCloud开发DCloud开发DCloud开发DCloud开发DCloud开发DCloud开发DCloud开发DCloud开发DCloud开发DCloud开发DCloud开发DCloud开发DCloud开发DCloud开发DCloud开发DCloud开发DCloud开发DCloud开发DCloud开发DCloud开发DCloud开发DCloud开发DCloud开发DCloud开发DCloud开发DCloud开发DCloud开发DCloud开发DCloud开发DCloud开发

以上是关于Rocketmq-Mqtt 开发实例的主要内容,如果未能解决你的问题,请参考以下文章

rocketmq 命令示例

消息队列概述

RocketMQ

分布式系统之消息队列

搭建 Apache RocketMQ 单机环境

rocketMq - rebalance介绍