RocketMQ简介及实践

Posted helloxc

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ简介及实践相关的知识,希望对你有一定的参考价值。

What is RocketMQ

Apache RocketMQ是一个分布式消息传递和流平台,具有低延迟,高性能和可靠性,万亿级容量和灵活的可扩展性。 它由四部分组成:NamerServer,Broker,Produer和Customer。 它们中的每一个都可以水平扩展而没有单一的故障点。 如下面的截图所示。

技术图片

 

NameServer Cluster

NameServers提供轻量级服务发现和路由。 每个NameServer记录完整的路由信息,提供相应的读写服务,并支持快速存储扩展。

NameServer是一个几乎无状态的节点,可集群部署,节点之间无任何信息同步。

将NameServer地址列表提供给客户端有四种方法:

  • 编程方式,例如 producer.setNamesrvAddr("ip:port")
  • Java选项,使用 rocketmq.namesrv.addr
  • 环境变量,使用 NAMESRV_ADDR
  • HTTP端点。

Broker Cluster

Brokers通过提供轻量级的TOPIC和QUEUE机制来处理消息存储。 它们支持Push和Pull模型,包含容错机制(2个副本或3个副本),并提供强大的峰值填充和按原始时间顺序累积数千亿条消息的能力。 此外,Brokers还提供灾难恢复,丰富的指标统计和警报机制,这些都是传统的消息传递系统所缺少的。

Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slaver,但是一个Slaver只能对应一个Master,Master与Slaver的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slaver。Master可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有的NameServer。

如下图所示,Broker有几个重要的子模块组成:

  • 远程处理模块,即Broker的入口,处理来自客户端的请求
  • 客户端管理模块,管理客户端(生产者/消费者)并维护消费者的主题订阅
  • 存储服务,提供简单的API来存储或查询物理磁盘中的消息
  • HA服务,提供主从broker之间的数据同步功能
  • 索引服务,按指定的key来为消息创建索引并提供快速消息查询

技术图片

 

Producer Cluster

Produers支持分布式部署。 Distributed Producers通过多种负载均衡模式向Broker集群发送消息。发送过程支持快速故障并具有低延迟。
 
Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Produce完全无状态。

Customers Cluster

Customers也支持Push和Pull两种模式的分布式部署。 它还支持群集消费和消息广播。 它提供实时消息订阅机制,可以满足大多数消费者的需求。 

Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer取Topic路由信息,并向提供Topic服务的Master、Slaver建立长连接,且定时向Master、Slaver发送心跳。Consumer即可从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。

 

Best Practices

Produer最佳实践

发送消息注意事项

1.一个应用尽可能用一个 Topic,消息子类型用 tags 来标识,tags 可以由应用自由设置。只有发送消息设置了tags,消费方在订阅消息时,才可以利用 tags 在 broker 做消息过滤。

2.每个消息在业务层面的唯一标识码,要设置到 keys 字段,方便将来定位消息丢失问题。服务器会为每个消息创建索引(哈希索引),应用可以通过 topic,key 来查询这条消息内容,以及消息被谁消费。由于是哈希索引,请务必保证 key 尽可能唯一,这样可以避免潜在的哈希冲突。

3.消息发送成功或者失败,要打印消息日志,务必要打印 sendresult 和 key 字段。

4.send 消息方法,只要不抛异常,就代表发送成功。但是发送成功会有多个状态:

  • SEND_OK:消息发送成功
  • FLUSH_DISK_TIMEOUT:消息发送成功,但是服务器刷盘超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失
  • FLUSH_SLAVE_TIMEOUT:消息发送成功,但是服务器同步到 Slave 时超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失
  • SLAVE_NOT_AVAILABLE:消息发送成功,但是此时 slave 不可用,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失。对于精确发送顺序消息的应用,由于顺序消息的局限性,可能会涉及到主备自动切换问题,所以如果sendresult 中的 status 字段不等于 SEND_OK,就应该尝试重试。对于其他应用,则没有必要这样

5.对于消息不可丢失应用,务必要有消息重发机制

消息发送失败处理

Producer 的 send 方法本身支持内部重试,重试逻辑如下:

  1. 至多重试 3 次
  2. 如果发送失败,则轮转到下一个 Broker 
  3. 这个方法的总耗时时间不超过 sendMsgTimeout 设置的值,默认 10s所以,如果本身向 broker 发送消息产生超时异常,就不会再做重试

如果调用 send 同步方法发送失败,则尝试将消息存储到 db,由后台线程定时重试,保证消息一定到达 Broker。

选择 oneway 形式发送

一个 RPC 调用,通常是这样一个过程

  1. 客户端发送请求到服务器
  2. 服务器处理该请求
  3. 服务器向客户端返回应答

所以一个 RPC 的耗时时间是上述三个步骤的总和,而某些场景要求耗时非常短,但是对可靠性要求并不高,例如日志收集类应用,此类应用可以采用 oneway 形式调用,oneway 形式只发送请求不等待应答,而发送请求在客户端实现层面仅仅是一个 os 系统调用的开销,即将数据写入客户端的 socket 缓冲区,此过程耗时通常在微秒级。RocketMQ不止可以直接推送消息,在消费端注册监听器进行监听,还可以由消费端决定自己去拉取数据。

 

Consumer最佳实践

消费过程要做到幂等

RocketMQ无法做到消息重复,所以如果业务对消息重复非常敏感,务必要在业务层面去重。将消息的唯一键,可以是MsgId,也可以是消息内容中的唯一标识字段,例如订单ID,消费之前判断是否在DB或Tair(全局KV存储)中存在,如果不存在则插入,并消费,否则跳过。(实践过程要考虑原子性问题,判断是否存在可以尝试插入,如果报主键冲突,则插入失败,直接跳过) msgid一定是全局唯一的标识符,但是可能会存在同样的消息有两个不同的msgid的情况(有多种原因),这种情况可能会使业务上重复,建议最好使用消息体中的唯一标识字段去重。

 

批量方式消费

如果业务流程支持批量方式消费,则可以很大程度上的提高吞吐量,可以通过设置Consumer的consumerMessageBatchMaxSize参数,默认是1,即一次消费一条。

跳过非重要的消息

发生消息堆积时,如果消费速度一直跟不上发送速度,可以选择丢弃不重要的消息。例如当前offset和maxOffset差值过大时(可能时因为消息系统堆积),直接把当前消息消费成功,可以快速使消息的消费和发出达到平衡。

优化消息消费过程

根据实际业务需要,尽可能的优化代码,减少DB访问数量,进而减少RT,提高消息的消费速度。

 

顺序消息

RocketMQ通过轮询所有队列的方式来确定消息被发送到哪一个队列(负载均衡策略)。比如下面的示例中,订单号相同的消息会被先后发送到同一个队列中:

// RocketMQ通过MessageQueueSelector中实现的算法来确定消息发送到哪一个队列上
// RocketMQ默认提供了两种MessageQueueSelector实现:随机/Hash
// 当然你可以根据业务实现自己的MessageQueueSelector来决定消息按照何种策略发送到消息队列中
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        Integer id = (Integer) arg;
        int index = id % mqs.size();
        return mqs.get(index);
    }
}, orderId);

 

事务消息

RocketMQ除了支持普通消息,顺序消息,另外还支持事务消息。

MQ与DB一致性

A(存在DB操作)、B(存在DB操作)两方需要保证分布式事务一致性,通过引入中间层MQ,A和MQ保持事务一致性(异常情况下通过MQ反查A接口实现check),B和MQ保证事务一致(通过重试),从而达到最终事务一致性。

技术图片

上面以DB为例,其实此处可以是任何业务或者数据源。

TransactionCheckListener 是在消息的commit或者rollback消息丢失的情况下才会回调(上图中灰色部分)。这种消息丢失只存在于断网或者rocketmq集群挂了的情况下。当rocketmq集群挂了,如果采用异步刷盘,存在1s内数据丢失风险,异步刷盘场景下保障事务没有意义。所以如果要核心业务用Rocketmq解决分布式事务问题,建议选择同步刷盘模式。

多系统之间数据一致性技术图片

当需要保证多方(超过2方)的分布式一致性,上面的两方事务一致性(通过Rocketmq的事务性消息解决)已经无法支持。这个时候需要引入TCC模式思想。

技术图片

以上图交易系统为例:

1)交易系统创建订单(往DB插入一条记录),同时发送订单创建消息。通过RocketMq事务性消息保证一致性

2)接着执行完成订单所需的同步核心RPC服务(非核心的系统通过监听MQ消息自行处理,处理结果不会影响交易状态)。执行成功更改订单状态,同时发送MQ消息。

3)交易系统接受自己发送的订单创建消息,通过定时调度系统创建延时回滚任务(或者使用RocketMq的重试功能,设置第二次发送时间为定时任务的延迟创建时间。在非消息堵塞的情况下,消息第一次到达延迟为1ms左右,这时可能RPC还未执行完,订单状态还未设置为完成,第二次消费时间可以指定)。延迟任务先通过查询订单状态判断订单是否完成,完成则不创建回滚任务,否则创建。 PS:多个RPC可以创建一个回滚任务,通过一个消费组接受一次消息就可以;也可以通过创建多个消费组,一个消息消费多次,每次消费创建一个RPC的回滚任务。 回滚任务失败,通过MQ的重发来重试。

以上是交易系统和其他系统之间保持最终一致性的解决方案。



 

 
 
参考:
http://rocketmq.apache.org/docs/rmq-arc/
https://yq.aliyun.com/articles/624207?utm_content=m_1000012577
https://www.jianshu.com/p/2838890f3284
https://www.jianshu.com/p/453c6e7ff81c
 

 

以上是关于RocketMQ简介及实践的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ 事务消息 原理及使用方法解析

RocketMQ Broker消息处理流程及部分源码解析

逆向及Bof基础实践

《RocketMQ实战专栏》为什么是你学习RocketMQ的最佳资料

喜马拉雅 Apache RocketMQ 消息治理实践

同程旅行基于 RocketMQ 高可用架构实践