消息中间件 - RocketMQ 详解(从软件安装到案例实现)

Posted Yan Yang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了消息中间件 - RocketMQ 详解(从软件安装到案例实现)相关的知识,希望对你有一定的参考价值。

内容

参考博文:https://www.cnblogs.com/xiaoxiongcanguan/p/11510366.html


一、RocketMQ 安装

博客地址Windows 下安装 RocketMQ

博客地址Linux 下安装 RocketMQ


二、 RocketMQ 作用和结构

1. RocketMQ 特点

RocketMQ 是阿里巴巴开源的分布式消息中间件。支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。它里面有几个区别于标准消息中件间的概念,如Group、Topic、Queue等。系统组成则由Producer、Consumer、Broker、NameServer等。

2 RocketMQ 执行流程


3. RocketMQ 作用

3.1 消息中间件结构图



3.2 应用解耦

微服务架构中,存在着众多子系统,共同完成对外部用户的服务。同时用户的一个下单行为横跨了N个业务子系统,如果按照传统的同步串行方式一个接一个的调用,用户的下单操作将会执行较长的时间,对用户不友好。同时,由于是同步调用,一旦某一个子系统出现了宕机,访问超时等问题,整个下单业务都将陷入瘫痪。
消息队列可以将同步的系统调用转为异步的消息投递,一定程度上解除业务子系统间的耦合。


3.3 削峰填谷

  大多数系统的访问流量并不是一天24小时均匀稳定的,而是存在着一定的突发性。例如电商的秒杀活动,系统配置在平时能承受住500qps,可在进行秒杀活动时,瞬时的qps可能达到了5000,为平常的10倍,如果不进行处理防护,将会导致服务瘫痪。
  可以选择扩容服务器来应对可能的高峰流量,但扩容的服务器在秒杀活动过去之后多数会被闲置,从而造成很大的浪费;也可以设定并发的阈值,在访问并发数达到一定程度时就进行熔断限流,拒绝手慢的秒杀用户下单,可这样会让用户体验很差。
  这时,消息队列就能派上用场了。我们可以在系统中使用消息队列作为缓冲,将每一个用户下单请求都作为一条消息存入消息队列,消息队列会根据消费者的消费速度以一种稳定的方式将流量传递给下游消费者系统,在消费者系统处理完下单操作后异步的通知用户下单结果。虽然用户可能会延迟一段时间才能得到反馈,但无论如何也比无法下单要好。
  消息队列就像一个漏桶,可以将瞬时的尖峰流量缓存起来,并以一种稳定的速度传递给下游消费者,从而达到流量削峰的目的。


4. rocketmq 组成部分

rocketmq由四大核心模块组成:producer、consumer、brokerServer、nameServer。其中brokerServer和nameServer是rocketmq的服务端,两者一起独立的对外提供服务;而producer和consumer可看做是rocketmq的客户端,一般依附于业务应用程序。

  1. Producer
    【1】producer负责发送消息。使用producer将消息发送到brokerServer,由brokerServer统一进行消息的分发。
    【2】rocketmq支持多种消息发送方式,如同步消息发送、异步回调消息发送、顺序消息发送以及一次性消息发送(异步无回调)。除了一次性消息发送,其余的发送方式均需要brokerServer返回发送结果的确认消息。
    【3】特别的是,rocketmq的一大特色是支持发送事务消息(半消息),能一定程度上解决分布式事务的问题。

  2. Consumer
    【1】consumer 负责消费producer发送的消息。consumer会从brokerServer获取消息,并传递给应用程序。
    【2】rocketMQ使用的消息原语是At Least Once(至少一次成功消费),如果一定时间内没有接收到consumer消息确认消费的响应结果,会将同一条消息再次投递给consumer。rocketmq采用ack机制保证消息的消费成功,所以consumer可能会多次收到同一条消息,需要consumer的业务方做好幂等防护。
    【3】使用者的角度来看,consumer分为两种方式来获取信息。一种是推模式(push consume),推模式看起来像是brokerServer将消息推给了consumer;另一种是拉模式(pull consume),拉模式看起来像是consumer主动的去brokerServer拉取消息(实际上,推模式是基于拉模式实现的)。

  3. BrokerServer
    【1】brokerServer负责消息的接收,存储和分发,是rocketmq最核心,最重量级的组成部分。
    【2】为实现高可用和高吞吐,brokerServer通常采用集群部署,共同对外提供服务。

  4. NameServer
    【1】nameServer负责提供路由元数据。例如,brokerServer通常是集群部署的,其拓扑结构会经常的发生变化。如果每次集群中broker机器的上下线都需要通知所有的消费者、生产者,效率太低。
    【2】因此,rocketmq引入了nameServer作为brokerServer路由信息的维护者,broker的每次上下线都和nameServer通信,由nameServer来维护broker的路由信息,而producer和consumer通过访问nameServer获得对应broker的访问地址后,再向对应的broker发起请求。nameServer解除了broker和客户端的耦合依赖关系,大大提高了效率。
    【3】在其它主流消息队列中也存在着类似的维护元信息功能的组件,如zookeeper等。rocketmq的设计者认为zk的功能过于强大,杀鸡焉用牛刀,通过一个精简版的元数据服务nameServer,以减少对外部系统的耦合依赖,得以提供更可靠的服务。
    【4】nameServer同样能以集群形式对外提供服务。但和zk集群不同的是,集群内的nameServer服务器并不会互相通信,而是保持相互独立。

5. rocketmq基本概念模型

介绍完rocketmq的组成部分之后,还需要再引入一些相关概念才能更好的理解rocketmq:

  1. topic 主题
    topic主题,代表一系列消息的集合,任何消息只能属于一个topic主题,主题是rocketmq进行消息发布订阅的最小单位。业务方可以通过创建并订阅各式各样的主题来满足自身的业务要求。不同主题之间的消息在逻辑上没有关联。

  2. tag 标签
    tag标签,tag从属于topic主题,主要用于对同一主题下的消息进行进一步区分。标签可以简单的认为是二级主题,通过tag标签功能,业务方可以方便的实现对各种二级主题的消费需求。

  3. group 组
    group组,代表着同一类客户端的集合。具体可分为消费者组(consumer group)和生产者组(producer group)两种。消费者组和生产者组之间没有任何关联(即使组名一样)。
    消费者组
      消费者组代表着同一类型的消费者集群。同一消费者组内的消费者通常消费同样的消息且消息消费逻辑一致。消费者组的概念使得consumer集群在消费消息时,rocketmq可以通过负载均衡来做到消费消息时的高可用和容错。消费者组的更多作用将会在后面的集群/广播消费模式中继续讲解。
    生产者组
      生产者组代表着同一类型的生产者集群。一般来说,消息的生产者在发出了消息得到确认之后便完成了任务,似乎没有必要为此抽象出生产者集群的概念。
      前面说到,rocketmq具有发送事务消息的特性,发送事务消息简单来说就是生产者先发送出一个半消息(预消息),然后执行本地的事务,在事务完成提交之后再跟着发送一个事务确认消息。半消息和普通消息的最大区别在于,半消息在投递给broker之后,broker不会马上让消费者进行消费,而是等待。只有当接收到生产者后续对应的的事务确认消息后,预消息和确认消息合二为一,才将对应的事务消息交给消费者去消费;而如果最终没有接收到事务确认消息,则会将消息直接删除不投递给消费者,以达到类似事务回滚的效果。事务消息对消费者来说是透明无感知的。
      可如果生产者在发送了预消息之后挂了怎么办?为解决这个问题,broker会在一定时间没有收到确认消息后,定时的回查生产者当前事务消息的状态,回查的范围是整个生产者组中的某一个在线节点。这种情况下,生产者和消费者一样,也构成了一个集群监听来自broker的回查。这样,即使发送消息的生产者发生了故障,在一定条件下整个生产者集群的事务消息发送功能依然可以正常运转。
      通过生产者组的概念,rocketmq实现了事务消息投递的高可用。

  4. message 消息
    message消息是rocketmq中传递消息的主体,消息具有全局唯一的messageID属性,用户可以根据messageID查询进行消息的精确查询。
    消息的内容可以是不超过rocketmq限制的、二进制的任意数据,rocketmq不会对消息承载的数据内容做任何干预。

  5. 集群(Clustering)/广播(Broadcasting)消费
    集群消费
      对于任意一条被订阅的消息,同一消费者组下的节点只有一个节点对其进行消费;一个消费者组中的全部节点分摊所有消息。
    广播消费
      对于任意一条被订阅的消息,同一消费者组下的所有节点都会对其进行消费;一个消费者组中的全部节点都能接收到全量的消息。
    混合模式消费
      实质上是前两者的综合。同一应用集群构成一个消费者组,不同应用集群之间构成多个不同的消费者组,但却可以订阅同一个topic/tag下的消息。
      对于任意一条被订阅的消息,同一消费者组之间只会有一个节点对其进行消费,不同消费者组都会进行全量消息的消费。


三、生产消息的类型有三种

  1. 同步发送消息 : syncSend()
    特点 : 等待发送的消息到达 RocketMQ 中,持久化到硬盘后,得到返回值才执行后面的代码

    rocketMQTemplate.syncSend("springboot-hello",msg);
    
  2. 异步发送消息 :asyncSend()
    特点 : 消息发送后,即执行后面的代码 , 有回调方法,成功和失败会回调对应的方法

    rocketMQTemplate.asyncSend("springboot-hello", msg, new SendCallback() {
        //两个方法
    
  3. 一次性发送 : sendMsgOne()
    特点 : 消息发送后,即执行后面的代码 ,没有回调方法

    rocketMQTemplate.sendOneWay("springboot-hello",msg);
    

四、消费模式有两种

1,集群模式 : 针对于一个集群的操作,只有一个节点接收到消息 msg

2,广播模式 ; 所有的都会收到消息 msg ,例如一个订单消息 , 消息服务 和 邮件服务 都需要收到消息 msg

1,集群模式 messageModel = MessageModel.CLUSTERING ,
对比上次,相同的消息内容只会接收一次,如果内容没有改变,不执行
    @RocketMQMessageListener(
        consumerGroup = "springboot-consumer-group",
        topic = "springboot-hello-model",
        messageModel = MessageModel.CLUSTERING   
    //增加这个属性设置为集群模式,所有的节点都要一起设置
)

2 , 广播模式 ,共同执行
    @RocketMQMessageListener(
        consumerGroup = "springboot-consumer-group",
        topic = "springboot-hello-model",
        messageModel = MessageModel.BROADCASTING
)

复杂场景, 当一个消息msg  ,有四个服务需要接收 , 其中两个是一个集群,只需要一个消费消息即可,另外两个是都需要消费的情景 
    将两个集群分成一个 消费组  consumerGroup = "springboot-clu",
    两个需要共同消费的一个组 , consumerGroup = "springboot-bro",
    根据组来设置消费模式是集群还是广播       

五、延时消息

指定延时执行时间, 消费者根据指定的时间,到点后执行

场景 : 用户下单后未立即付款 ,订单保留15分组 ,利用延时消息 ,到点后检查用户是否支付,未支付取消订单并回滚库存逻辑

需要一个msg对象 : 可以使用API来构造 ,可以自己new出来

//延时发送
    @RequestMapping("/sendMsgDelay")
    public Object sendMsgDelay(String msg){
        /**
         * 第一个参数 : topic  ":"  tag
         * 第二个参数 : message 对象, 将msg消息 构建成一个message 对象
         * 第三个参数 : 发送超时时间,如果因为网络波动迟迟没有发送到 RocketMQ中,就会报超时错误,
         * 第四个参数 : 延迟消费等级 ,对应的等级对应不同的时间
         */
        //底层是对 原生API 的封装, msg不需要转换成byte[],底层转
        // syncSend 同步请求
        MessageBuilder<String> messageBuilder = MessageBuilder.withPayload(msg);
        rocketMQTemplate.syncSend("springboot-delay",messageBuilder.build(),5000,2);
        return "发送成功";
    }

对应的延时等级 ,默认
    messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
    可以手动在配置文件中改

六、消息过滤

作用 : 通过生产者的消息标签,消费者根据标签的条件看是否符合,符合才消费,(消费者选择是否消费)

两种实现

TAG 标签
SQL92

需要在配置应文件中配置 : 应用目录 conf/broker.conf 文件中配置

enablePropertyFilter=true //这行就是配置使用 SQL92
namesrvAddr=127.0.0.1:9876

消费者中代码
     @RequestMapping("/sendMsgTag")
    public Object sendMsgTag(String msg,String tag){
        /**
         * 第一个参数 : topic  ":"  tag    tag为标签
         * 第二个参数 : message 消息内容
         */
        //底层是对 原生API 的封装, msg不需要转换成byte[],底层转
        // syncSend 同步请求
        rocketMQTemplate.syncSend("springboot-tag:"+tag,msg);
        return "发送成功";
    }

消费者中代码
    @RocketMQMessageListener(
        consumerGroup = "springboot-consumer-group",
        topic = "springboot-tag",
        selectorType = SelectorType.TAG,  //默认也是 TAG
        selectorExpression = "TagA || TagC"       //默认是 * , || 是或的意思
)
public class Consumer4 implements RocketMQListener<String> {
    /**
     * 消费者会一直监听指定主题下面的消息,如果消息来了就执行 onMessage 方法
     * 参数是我吗的消息内容
     * @param msg 消息内容
     */
    @Override
    public void onMessage(String msg) {
        System.out.println("消息内容为:" + msg);
    }
}


SQL92方式
生产者的方法需要注意
     @RequestMapping("/sendMsgSql")
    public Object sendMsgSql(String msg,int age){
        /**
         * 第一个参数 : topic  ":"  tag
         * 第二个参数 : message 消息内容
         */
        //底层是对 原生API 的封装, msg不需要转换成byte[],底层转
        // syncSend 同步请求
        Map<String ,Object> map = new HashMap<>();
        map.put("age",age);
        rocketMQTemplate.convertAndSend("springboot-sql",msg,map);
        return "发送成功";
    }



消费者中修改Type
    @RocketMQMessageListener(
        consumerGroup = "springboot-consumer-group",
        topic = "springboot-sql",
        selectorType = SelectorType.SQL92,
        selectorExpression = "age > 10"    //可以做很多的比较 ,not , or 等等都可以
)

七、RocketMQ 集成 SpringBoot

博客地址:RocketMQ 集成 SpringBoot


总结

上面就是 RocketMQ 的使用了,代码仅供参考,欢迎讨论交流。

以上是关于消息中间件 - RocketMQ 详解(从软件安装到案例实现)的主要内容,如果未能解决你的问题,请参考以下文章

消息中间件 - RocketMQ 详解(从软件安装到案例实现)

干货消息中间件—RocketMQ消息消费

RocketMQ 详解系列

RocketMQ 详解系列

RocketMQ 详解系列

详解文件IO系列讲讲 MQ 消息中间件 (Kafka,RocketMQ等)与 MMAPPageCache 的故事...