RocketMQ消息

Posted guxiaohai_

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ消息相关的知识,希望对你有一定的参考价值。

一、 MQ背景&选型

消息队列作为高并发系统的核心组件之一,能够帮助业务系统解构提升开发效率和系统稳定性。主要具有以下优势:
    削峰填谷(主要解决瞬时写压力大于应用服务能力导致消息丢失、系统奔溃等问题)
    系统解耦(解决不同重要程度、不同能力级别系统之间依赖导致一死全死)
    提升性能(当存在一对多调用时,可以发一条消息给消息系统,让消息系统通知相关系统)
    蓄流压测(线上有些链路不好压测,可以通过堆积一定量消息再放开来压测)
目前主流的MQ主要是Rocketmq、kafka、Rabbitmq,Rocketmq相比于Rabbitmq、kafka具有主要优势特性有:

  • 支持事务型消息(消息发送和DB操作保持两方的最终一致性,rabbitmq和kafka不支持)
  • 支持结合rocketmq的多个系统之间数据最终一致性(多方事务,二方事务是前提)
  • 支持18个级别的延迟消息(rabbitmq和kafka不支持)
  • 支持指定次数和时间间隔的失败消息重发(kafka不支持,rabbitmq需要手动确认)
  • 支持consumer端tag过滤,减少不必要的网络传输(rabbitmq和kafka不支持)
  • 支持重复消费(rabbitmq不支持,kafka支持)

二:RocketMQ基本概念

Client端

  • Producer Group 一类Producer的集合名称,这类Producer通常发送一类消息,且发送逻辑一致
  • Consumer Group 一类Consumer的集合名称,这类Consumer通常消费一类消息,且消费逻辑一致

Server端

  • Broker 消息中转角色,负责存储消息,转发消息,这里就是RocketMQ Server
  • Topic 消息的主题,用于定义并在服务端配置,消费者可以按照主题进行订阅,也就是消息分类,通常一个系统一个Topic
  • Message 在生产者、消费者、服务器之间传递的消息,一个message必须属于一个Topic 消息是要传递的信息。邮件中必须包含一个主题,该主题可以解释为要发送给您的信的地址。消息还可能具有可选标签和额外的键值对。例如,您可以为消息设置业务密钥,然后在代理服务器上查找消息以在开发过程中诊断问题。
  • Namesrver 一个无状态的名称服务,可以集群部署,每一个broker启动的时候都会向名称服务器注册,主要是接收broker的注册,接收客户端的路由请求并返回路由信息
  • Offset 偏移量,消费者拉取消息时需要知道上一次消费到了什么位置, 这一次从哪里开始
  • Partition 分区,Topic物理上的分组,一个Topic可以分为多个分区,每个分区是一一个有序的队列。 分区中的每条消息都会给分配一个有序的ID,也就是偏移量,保证了顺序,消费的正确性
  • Tag 用于对消息进行过滤,理解为message的标记,同一业务不同目的的message可以用相同的topic但是 可以用不同的tag来区分
  • key 消息的KEY字段是为了唯- -表示消息的,方便查问题,不是说必须设置,只是说设置为了方便开发和运维定位问题。 比如:这个KEY可以是订单ID等

三:实战工具类

  1. 生产者:
public class Producer {
    //实例化对象
    private static Producer producer = null;
    //对象构造方法
    private Producer(){};

    /**
     * @Author: gwh
     * @Description:    单例构造方法
     * @param topic 主题
     * @param tags 标签
     * @param message 内容
     * @param groupName 组名
     * @param address 地址
     * @param key key值
     * @Date: 2021-07-06 16:53
     */
    public static Producer getInstance(String topic, String tags, String message, String groupName, String address,String key) throws Exception{
        //初始化实例化对象
        if(producer == null){
            synchronized (Producer.class){
                if(producer == null){
                    producer = new Producer();
                }
            }
        }
        // TODO 1 创建消息生产者,指定生成组名
        DefaultMQProducer defaultMQProducer = new DefaultMQProducer(groupName);
        // TODO 2 指定NameServer的地址
        defaultMQProducer.setNamesrvAddr(address);
        // TODO 3 启动生产者
        defaultMQProducer.start();
        // TODO 4 构建消息对象,主要是设置消息的主题、标签、内容
        Message resultMessage = new Message(topic, tags, message.getBytes());
        //key值
        if(null != key && !key.equals("")){
            resultMessage.setKeys(key);
        }
        // TODO 5 发送消息
        defaultMQProducer.send(resultMessage);
        // TODO 6 关闭生产者
        defaultMQProducer.shutdown();
        return producer;
    }
}
  1. 消费者:
public class Consumer {
    //实例化对象
    private static Consumer consumer = null;
    //对象构造方法
    private Consumer(){};

    /**
     * @Author: gwh
     * @Description:    单例构造方法
     * @param topic 订阅主题
     * @param groupName 组名
     * @param address 地址
     * @Date: 2021-07-06 16:53
     */
    public static Consumer getInstance(String topic, String groupName, String address) throws Exception{
        //初始化实例化对象
        if(consumer == null){
            synchronized (Consumer.class){
                if(consumer == null){
                    consumer = new Consumer();
                }
            }
        }
        // TODO 1 创建消费者,指定所属的消费者组名
        DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer(groupName);
        // TODO 2 指定NameServer的地址
        defaultMQPushConsumer.setNamesrvAddr(address);
        // TODO 3 指定消费者订阅的主题和标签
        defaultMQPushConsumer.subscribe(topic, "*");
        // TODO 4 进行订阅:注册回调函数,编写处理消息的逻辑
        defaultMQPushConsumer.registerMessageListener((List<MessageExt> list, ConsumeConcurrentlyContext context) -> {
            try {
                for (MessageExt msg : list) {
                	//....处理回调消息
                }
            } catch (Throwable throwable) {
                throwable.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        // TODO 5 启动消费者
        defaultMQPushConsumer.start();
        return consumer;
    }
}

以上是关于RocketMQ消息的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ源码分析之从官方示例窥探:RocketMQ事务消息实现基本思想

rocketmq发送消息代码

Spring boot实战项目整合阿里云RocketMQ 消息队列实现发送普通消息,延时消息

Spring boot实战项目整合阿里云RocketMQ 消息队列实现发送普通消息,延时消息

Spring boot实战项目整合阿里云RocketMQ 消息队列实现发送普通消息,延时消息

RocketMQ学习笔记:消息发送模式