RocketMQ笔记:普通消息

Posted 无虑的小猪

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ笔记:普通消息相关的知识,希望对你有一定的参考价值。

  普通消息为 RocketMQ 中最基础的消息,支持生产者和消费者的异步解耦通信。

一、普通消息的生命周期

 

1、初始化

  消息被生产者构建并完成初始化,待发送到服务端的状态。

2、待消费

  消息被发送到服务端,对消费者可见,等待消费者消费的状态。

3、消费中

  消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,RocketMQ会对消息进行重试处理。

4、消费提交

  消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。 RocketMQ默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。

5、消息删除

  RocketMQ按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。

二、普通消息的发送方式

  RocketMQ提供了三种发送消息的模式,分别为同步发送消息、异步发送消息、单向发送消息。

 public enum CommunicationMode 
     SYNC,  // 同步
     ASYNC,  // 异步
     ONEWAY;  // 单向
 
     private CommunicationMode() 
     
 

1、同步发送

  同步发送是指消息发送方发出数据后,同步等待,直到收到接收方发回响应之后才发下一个请求。

  

2、异步发送

  消息发送方在发送了一条消息后,不等接收方发回响应,接着进行第二条消息发送。发送方通过回调接口的方式接收服务器响应,并对响应结果进行处理。

  异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。

  

3、单向发送

  单向(Oneway)发送特点为发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。

  单向发送方式发送消息的过程耗时非常短,一般在微秒级别。不需要关心发送结果的场景,例如日志发送。

  

三、普通消息的消费方式

 // 消息消费模式
 public enum MessageModel 
     // 广播
     BROADCASTING("BROADCASTING"),
     // 负载均衡
     CLUSTERING("CLUSTERING");
     private String modeCN;
     private MessageModel(String modeCN) 
         this.modeCN = modeCN;
     
     public String getModeCN() 
         return this.modeCN;
     
 

  在实际的开发过程中,使用consumer的setMessageModel()方法,指定消费方式。

1、负载均衡消费模式 - CLUSTERING

  消费者采用负载均衡方式消费消息,一个分组(Group)下的多个消费者共同消费队列消息,每个消费者处理的消息不同。

  一个Consumer Group中的各个Consumer实例分摊去消费消息,即一条消息只会投递到一个Consumer Group下面的一个实例。

  

  例如某个Topic有3个队列,其中一个Consumer Group 有 3 个实例,那么每个实例只消费其中的1个队列。集群消费模式是消费者默认的消费方式。

2、广播消费模式 - BROADCASTING

  广播消费模式中消息将对一个Consumer Group下的各个Consumer实例都投递一遍。即使这些 Consumer属于同一个Consumer Group,消息也会被Consumer Group 中的每个Consumer都消费一次。

 

  

  实际上,是一个消费组下的每个消费者实例都获取到了topic下面的每个Message Queue去拉取消费。所以消息会投递到每个消费者实例。

3、消费模式特点

1、负载均衡消费模式

  消费端集群化部署,每条消息只需要被处理一次;

  每一条消息都只会被分发到一台机器上处理;

  不保证每一次失败重投的消息路由到同一台机器上。

2、集群消费模式

  每条消息都需要被相同逻辑的多台机器处理;

  消息队列 RocketMQ 保证每条消息至少被每台客户端消费一次,但是并不会对消费失败的消息进行失败重投;

  客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过;

  广播消费模式下不支持重置消费位点,广播模式下服务端不维护消费进度,所以消息队列 RocketMQ 控制台不支持消息堆积查询、消息堆积报警和订阅关系查询功能;

  广播消费模式下不支持顺序消息。

 

 

四、普通消息的示例demo

  工具类详见:RocketMQ笔记(六):示例代码工具类

1、消息发送封装类 SDKSendMsg

 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.client.producer.RequestCallback;
 import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import java.util.Objects;
 
 /**
  * @Description: 发送消息方式
  */
 public class SDKSendMsg 
     // 默认组
     private final static String DEFAULT_GROUP_NAME = "test-group";
     // 测试地址
     private final static String DEFAULT_NAMESRV_ADDR = "192.168.33.55:9876";
     // 默认Topic
     public final static String DEFAULT_TOPIC = "TestTopic";
     // 同步消息的标签与键
     public final static String DEFAULT_SYNC_MSG_TAG = "SYNC_TAG";
     public final static String DEFAULT_SYNC_MSG_KEY = "SYNC_KEY";
     // 异步消息的标签与键
     public final static String DEFAULT_ASYNC_MSG_TAG = "ASYNC_TAG";
     public final static String DEFAULT_ASYNC_MSG_KEY = "ASYNC_KEY";
     // 单向
     public final static String DEFAULT_ONEWAY_MSG_TAG = "ONEWAY_TAG";
     public final static String DEFAULT_ONEWAY_MSG_KEY = "ONEWAY_KEY";
 
     // 声明生产者
     private DefaultMQProducer producer;
 
     public SDKSendMsg() 
         // 启动生产者实例
         try 
             // 实例化生产者组名称
             DefaultMQProducer producer = new DefaultMQProducer(DEFAULT_GROUP_NAME);
             // 指定name server地址
             producer.setNamesrvAddr(DEFAULT_NAMESRV_ADDR);
             this.producer = producer;
             this.producer.start();
          catch (MQClientException e) 
             e.printStackTrace();
         
     
 
     /**
      * 关闭生产者实例
      */
     public void shutdownProducer() 
         this.producer.shutdown();
     
 
 
     /**
      * 同步发送消息, 使用默认超时时间
      * @param topic  主题
      * @param msgTag 消息标签
      * @param msgKey 消息key
      * @param msgBody 消息内容
      * @return
      */
     public SendResult syncSendMsg(String topic, String msgTag, String msgKey, String msgBody) 
         return syncSendMsg(topic, msgTag, msgKey, msgBody, null);
     
 
     /**
      * 同步发送消息, 使用指定的超时时间
      * @param topic  主题
      * @param msgTag 消息标签
      * @param msgKey 消息key
      * @param msgBody 消息内容
      * @param timeout 超时时间
      * @return
      */
     public SendResult syncSendMsg(String topic, String msgTag, String msgKey, String msgBody, Long timeout) 
         System.out.println("发送同步消息: " + msgBody);
         try 
             Message msg = new Message(topic, msgTag, msgKey, msgBody.getBytes(RemotingHelper.DEFAULT_CHARSET));
             SendResult send;
             if (Objects.isNull(timeout)) 
                 send = this.producer.send(msg);
                 System.out.printf("消息发送结果:%s%n", send);
                 return send;
             
             send = this.producer.send(msg, timeout);
             System.out.printf("消息发送结果:%s%n", send);
             return send;
          catch (Exception e) 
             e.printStackTrace();
         
         return null;
     
 
     /**
      * 异步发送消息,使用默认的回调处理
      * @param topic  主题
      * @param msgTag 消息标签
      * @param msgKey 消息key
      * @param msgBody 消息内容
      * @param timeout 超时时间
      * @return
      */
     public Message asynSendMsg(String topic, String msgTag, String msgKey, String msgBody, Long timeout) 
         System.out.println("发送异步消息: " + msgBody);
         try 
             Message msg = new Message(topic, msgTag, msgKey, msgBody.getBytes(RemotingHelper.DEFAULT_CHARSET));
             return this.producer.request(msg, timeout);
          catch (Exception e) 
             e.printStackTrace();
         
         return null;
     
 
     /**
      * 异步发送消息,自定义回调处理
      * @param topic  主题
      * @param msgTag 消息标签
      * @param msgKey 消息key
      * @param msgBody 消息内容
      * @param timeout 超时时间
      * @param callback 回调处理
      */
     public void asynSendMsg(String topic, String msgTag, String msgKey, String msgBody, Long timeout, RequestCallback callback) 
         System.out.println("发送异步消息: " + msgBody);
         try 
             Message msg = new Message(topic, msgTag, msgKey, msgBody.getBytes(RemotingHelper.DEFAULT_CHARSET));
             this.producer.request(msg, callback, timeout);
          catch (Exception e) 
             e.printStackTrace();
         
     
 
     /**
      * 发送单向消息
      * @param topic
      * @param msgTag
      * @param msgKey
      * @param msgBody
      */
     public void sendOneWay(String topic, String msgTag, String msgKey, String msgBody) 
         System.out.println("发送单向消息: " + msgBody);
         try 
             Message msg = new Message(topic, msgTag, msgKey, msgBody.getBytes(RemotingHelper.DEFAULT_CHARSET));
             this.producer.sendOneway(msg);
          catch (Exception e) 
             e.printStackTrace();
         
     
 

2、生产者 ProducerMsg

 import org.apache.rocketmq.client.producer.RequestCallback;
 import org.apache.rocketmq.common.message.Message;
 /**
  * @Description: 发送消息
  */
 public class ProducerMsg 
     public static void main(String[] args) 
         // 创建消息发送实例
         SDKSendMsg sdkSendMsg = new SDKSendMsg();
 
         // 同步消息发送
         String syncMsg = "同步消息 -- " + 0;
         sdkSendMsg.syncSendMsg(SDKSendMsg.DEFAULT_TOPIC, SDKSendMsg.DEFAULT_SYNC_MSG_TAG,
                 SDKSendMsg.DEFAULT_SYNC_MSG_KEY, syncMsg);
 
         // 异步消息发送
         String asynMsg = "异步消息 --" + 1;
         sdkSendMsg.asynSendMsg(SDKSendMsg.DEFAULT_TOPIC, SDKSendMsg.DEFAULT_ASYNC_MSG_TAG,
                 SDKSendMsg.DEFAULT_ASYNC_MSG_KEY, asynMsg, 3000l, new RequestCallback() 
                     @Override
                     public void onSuccess(Message message) 
                         System.out.println("异步消息发送成功");
                         // TODO 业务数据状态更新
                     
                     @Override
                     public void onException(Throwable throwable) 
                         System.out.println("异步消息发送失败");
                         // TODO 业务状态回滚
                     
                 );
 
         // 单向消息发送
         String oneWayMsg = "单向消息 --" + 2;
         sdkSendMsg.sendOneWay(SDKSendMsg.DEFAULT_TOPIC, SDKSendMsg.DEFAULT_ONEWAY_MSG_TAG,
                 SDKSendMsg.DEFAULT_ONEWAY_MSG_KEY, oneWayMsg);
 
         // 销毁生产者实例
         sdkSendMsg.shutdownProducer();
     
 

3、消费者 ProducerMsg

 import com.snails.rmq.common.RMQConstant;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
 import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
 
 import java.io.UnsupportedEncodingException;
 import java.util.List;
 
 /**
  * @Description: RocketMQ并发消费
  */
 public class ConsumerMsg 
     public static void main(String[] args) throws MQClientException 
 
         // 实例化消费者组名称
         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RMQConstant.TEST_GROUP);
         // 指定name server地址
         consumer.setNamesrvAddr(RMQConstant.NAEMSRV_ADDR);
         // 订阅至少一个主题以供消费
         consumer.subscribe(RMQConstant.TEST_TOPIC, "*");
         // 负载均衡消费模式
         consumer.setMessageModel(MessageModel.CLUSTERING);
         // 广播消费模式
 //        consumer.setMessageModel(MessageModel.BROADCASTING);
         //  注册回调,处理从服务端获取的消息
         consumer.registerMessageListener(new MessageListenerConcurrently() 
             @Override
             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) 
                 for (MessageExt msg : msgs) 
                     System.out.println("当前消息的KEY: " + msg.getKeys());
                     try 
                         if (SDKSendMsg.DEFAULT_SYNC_MSG_KEY.equals(msg.getKeys())) 
                             System.out.println(String.format("线程%s,接收同步消息:%s", Thread.currentThread().getName(), new String(msg.getBody(), "UTF-8")));
                          else if (SDKSendMsg.DEFAULT_ASYNC_MSG_KEY.equals(msg.getKeys())) 
                             System.out.println(String.format("线程%s,接收异步消息:%s", Thread.currentThread().getName(), new String(msg.getBody(), "UTF-8")));
                         else if (SDKSendMsg.DEFAULT_ONEWAY_MSG_KEY.equals(msg.getKeys())) 
                             System.out.println(String.format("线程%s,接收单向消息:%s", Thread.currentThread().getName(), new String(msg.getBody(), "UTF-8")));
                         
                      catch (UnsupportedEncodingException e) 
                         // TODO 补偿机制
                         System.out.println(e.getMessage());
                     
                 
                 // 消费消息确认
                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
             
 
         );
         // 启动消费者实例
         consumer.start();
         System.out.printf("消费者已启动.%n");
     
 

 

RocketMQ 消息集成:多类型业务消息-普通消息

引言

Apache RocketMQ 诞生至今,历经十余年大规模业务稳定性打磨,服务了 100% 阿里集团内部业务以及阿里云数以万计的企业客户。作为金融级可靠的业务消息方案,RocketMQ 从创建之初就一直专注于业务集成领域的异步通信能力构建。本篇将从业务集成场景的诉求开始,介绍 RocketMQ 作为业务消息集成方案的核心能力和优势,通过功能场景、应用案例以及最佳实践等角度介绍 RocketMQ 普通消息类型的使用。

说起业务集成场景,RocketMQ 最初的使用场景就是典型代表。RocketMQ 诞生于阿里的电商系统,电商系统经常需要做各种大促活动,在这类复杂需求场景下对消息系统的吞吐性能、端到端延迟、削峰填谷等能力有着极高的要求。

一句话概括今天的核心问题,跑在核心交易业务链路的消息有什么特点,有什么要求,和跑在离线分析等场景的消息有什么不同。下面和大家一起来探讨~

业务集成 vs 数据集成

集成目标不同

做业务核心架构设计时,很多时候需要面向上层需求去完成业务逻辑的设计。以电商交易场景为例,通过微服务的拆分,可能在整个链路中会拆成很多个环节,不同应用之间通过消息去集成时,更多的是关注用户订单的流转过程,关注这个业务逻辑是否会正常的处理,这个就是业务集成。

对比一下,数据集成是以数据为中心,更多的是关注业务集成产生的数据,去分析这些业务数据的价值。数据集成并不关心这个数据是从哪里来,只关心数据本身的属性和数据之间的关系。

关注重点不同

在业务集成里随着企业业务逻辑的拓宽和复杂度的提升,调用和被调用方之间的耦合性会逐步增加,链路的拓扑也会变得越来越复杂。经常会出现一条消息的上游是另一条消息的下游,一个服务可能既是发送方也是消费方,等等。

而在数据集成的场景里面,并不关注上述链路,更多是关注数据的多样性。也就是说,在做数据集成分析时,更多的是从各种异构的数据源里去提取、汇聚这些数据,然后把这些异构系统的数据聚合在一起做清洗,最终汇聚成结构化的数据或报表去做分析。数据集成更多是关注数据的异构性和多样性。

实时性不同

业务集成简单理解就是一种在线的逻辑,或者是一种强实时的逻辑。在这个业务集成领域,无论同步调用还是异步调用,都对调用和被调用之间的响应协同机制有一定的要求。举个例子,一个订单的处理必须是要在毫秒级完成,否则用户的体验会非常的差。

但是在数据集成领域,更多的可能是近实时甚至是离线非实时的场景,也就是说通过批、实时流或近实时流的 场景去爬取数据之后做分析,具体链路对于用户来说并不是可见的,这也是数据集成和业务集成侧重点的差异。

业务集成对消息系统的核心诉求

消息队列是企业业务集成的主要模式之一,它是一种异步通信模式。异步模式提供了低耦合、高可靠、可观测的异步通信能力。那么业务集成链路里使用消息之后会带来什么效果呢?这里稍微罗列一下。

上图就是一个比较典型的上层的应用链路,从应用 A 到下层的应用 B 的一个单链路,通过发送初始化或者结构化一个消息,作为调用事件发送到事件通道,这个通道就是消息系统,比如 RocketMQ、RabbitMQ 等。在时间通道里存储后通过过滤路由的分发组件匹配到下游,然后推送处理。与此同时,还会有可观测、运维、监控的一些体系去支撑这个链路的可靠运行。

完整的功能需求非常多,这里提炼业务集成对消息系统的四个核心诉求:

1)多类型消息传输:支持多样业务场景集成诉求,主要包括普通消息、定时消息、事务消息、顺序消息等;

2)丰富路由分发能力:支持多种分发路由条件,包括 Tag 过滤、消息属性过滤,一对多、一对一分发等;

3)多样交互模式:支持收发消息多样交互方式,支持同步、异步发送,支持主动消费、被动推送消费,支持流式应答、单条应答;

4)可观测体系:支持 Metrics、Trace、Events 分析,支持单链路、全链路轨迹追踪,支持 Metrics 分析和监控告警,支持系统运行事件、业务事件透出处理。

RocketMQ 作为非常典型的业务消息方案,正是对应上述业务集成的诉求,提供了完善的消息功能、丰富的客户端接口以及完善的可观测体系和稳定性保障机制。

接下来就开始逐步拆解 RocketMQ 的多类型消息,本篇主要介绍普通消息。

普通消息原理介绍

功能简介

在多种消息类型中,普通消息是最简单也最为重要。普通消息是 RocketMQ 的基本消息类型,提供高吞吐、扩展、低延迟、异步的通信能力。其他高级消息类型基本都是在这种普通消息类型的基础上叠加了独有的控制特性,或者是特定的使用的方式。

下面这张图就是普通消息的一个典型的拓扑,和消息队列典型场景一样,生产者发送消息,发送普通消息到服务端去存储,存储完之后,会把消息按照订阅关系的匹配,最后推送给下游的消费方去做消费。

普通消息的特点

1)原子性:消息之间没有关联关系,收发处理逻辑原子;

2)扩展性:普通消息容量、能力可扩展,支持多队列存储、水平拆分、并发消费;

3)低延迟:普通消息链路短,交互简单,状态简单,链路极简,毫秒级低延迟通信。

消息的生命周期

普通消息从初始化发送开始到最终被处理的过程中会经历多个状态和过程,而了解消息的生命周期,可以帮助我们去判断线上出现问题后如何快速定位和解决。

简单来说消息的生命周期可以抽象成五个状态:

  • 初始化:普通消息被生产者构建初始化完成,待发送到服务端的状态;
  • 待消费:消息被传输到服务端,对下游可见,等待消费者获取处理的状态;
  • 消费中:消息被消费者获取,并按照业务逻辑处理过程,此时服务端会等待消费完成,如果一定时间后没有收到消费提交的事件,消息还会重试处理;
  • 消费提交:消费者完成消息处理,并提交应答事件到服务端,服务端标记当前消息已经被处理(包括消费成功和失败)。RocketMQ默认支持所有消息保留,此时消息数据并不会立即被删除,只是逻辑标记完成,在消息被物理删除之前,消费者仍然可以回溯重新处理消息;
  • 消息删除:RocketMQ 按照消息保存时间机制滚动清理最早的消息数据,将消息从物理文件中删除。

普通消息应用场景和案例

简单的了解原理和基本介绍之后,那普通消息主要用在哪里呢?普通消息是RocketMQ应用最广泛,使用规模最大的一种消息类型,它主要集中在服务间的解耦调用,同时还有一些批量数据的采集传输等场景。

使用场景

1)微服务调用解耦

  • 异步化解耦:普通消息实现微服务异步调用,缩短业务流和响应时间。
  • 流量削峰填谷:普通消息海量堆积能力,解决流量峰值下游处理能力不足的稳定性风险。

2)实时数据传输

  • 高吞吐传输:普通消息可以实现无限水平扩展,数据传输吞吐高,解决采集上报问题。
  • 实时传输:普通消息实时传输投递,下游可以及时消费实现计算和分析。

案例介绍

1)场景简介

交易平台是买卖家在线上根据约定的契约完成钱货交换的过程涉及的系统。交易平台涉及到和支付、物流、下单、运营等多个子系统的交互大多使用 RocketMQ 普通消息做异步解耦,消息的可靠处理是电商大促保障的核心。

2)核心痛点

订单状态机复杂,需要缩短链路时间:订单生命周期长,涉及下游多个子系统流转,同步调用耗时长,用户体验差。

大促场景海量订单处理,下游压力大:大促场景订单流量大,各子系统处理能力不足导致系统崩溃。

分布式场景订单变化持久化和下游调用事务性:订单状态流转需要确保数据库状态变更和下游调用同时成功或者失败,即事务性。

快速上手收发消息

说了这么多场景和案例,直接看一下代码怎么用。

发送普通消息

发送消息的流程非常简单,但这其中需要注意以下几点:

  • 消息初始化应尽可能完整:普通消息初始化包括主题、Tag 标签、索引 Key 和负载。可以按实际情况设置完成。
  • 消息发送需要捕获结果和异常:消息发送完成需要获取响应结果,如果失败需要捕获异常并做重试处理。

消费普通消息

RocketMQ 支持的消费方式有多种,有主动获取的方式,也有被动消费监听器推送的方式。

被动消费方式只需要注册消费监听器,然后监听器内部去处理这个逻辑,最终返回消费结果。如果消费失败,希望 RocketMQ 再做重投,就要返回一个失败的结果;抛异常也是返回失败。类似于这样的结果,返回服务端就完成了整个消费的过程。

对于主动获取的方式,会更加灵活,由业务方主动调用获取消息,可以按照自己的速率和并发取消息,处理完成后,再回复 RocketMQ 服务端消费结果。

产品预告:新一代 RocketMQ 5.0 版实例

最后预告一下,阿里云消息队列 RocketMQ 版在八月份发布新版本,新版本具备更强弹性、更低成本、更易运维等特点,欢迎保持关注。

原文链接

本文为阿里云原创内容,未经允许不得转载。

以上是关于RocketMQ笔记:普通消息的主要内容,如果未能解决你的问题,请参考以下文章

深入理解RocketMq普通消息和顺序消息使用,原理,优化

RocketMQ 消息集成:多类型业务消息-普通消息

RocketMQ 消息集成:多类型业务消息-普通消息

RocketMQ 源码阅读 ---- 消息消费(普通消息)

RocketMQ 源码阅读 ---- 消息消费(普通消息)

Spring boot实战项目整合阿里云RocketMQ 消息队列实现发送普通消息,延时消息