RocketMQ 详解
Posted 罗志宏
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ 详解相关的知识,希望对你有一定的参考价值。
RocketMQ 详解
前言:
RocketMQ 是阿里开源的分布式消息中间件,跟其它中间件相比,RocketMQ 的特点是纯JAVA实现
1. 基础概念
- Producer: 消息生产者,负责产生消息,一般由业务系统负责产生消息
- Producer Group:消息生产者组,简单来说就是多个发送同一类消息的生产者称之为一个生产者
- Consumer:消息消费者,负责消费消息,一般是后台系统负责异步消费
- Consumer Group:消费者组,和生产者类似,消费同一类消息的多个 Consumer 实例组成一个消费者组
- Topic:主题,用于将消息按主题做划分,Producer将消息发往指定的Topic,Consumer订阅该Topic就可以收到这条消息
- Message:消息,每个message必须指定一个topic,Message 还有一个可选的 Tag 设置,以便消费端可以基于 Tag 进行过滤消息
- Tag:标签,子主题(二级分类)对topic的进一步细化,用于区分同一个主题下的不同业务的消息
- Broker:Broker是RocketMQ的核心模块,
负责接收并存储消息
,同时提供Push/Pull接口来将消息发送给Consumer。Broker同时提供消息查询的功能,可以通过MessageID和MessageKey来查询消息。Borker会将自己的Topic配置信息实时同步到NameServer - Queue:Topic和Queue是1对多的关系,一个Topic下可以包含多个Queue,主要用于负载均衡,Queue数量设置建议不要比消费者数少。发送消息时,用户只指定Topic,Producer会根据Topic的路由信息选择具体发到哪个Queue上。Consumer订阅消息时,会根据负载均衡策略决定订阅哪些Queue的消息
- Offset:RocketMQ在存储消息时会为每个Topic下的每个Queue生成一个消息的索引文件,每个Queue都对应一个Offset记录当前Queue中消息条数
- NameServer:NameServer可以看作是RocketMQ的注册中心,它管理两部分数据:集群的Topic-Queue的路由配置;Broker的实时配置信息。其它模块通过Nameserv提供的接口获取最新的Topic配置和路由信息;各 NameServer 之间不会互相通信, 各 NameServer 都有完整的路由信息,即无状态。
Producer/Consumer
:通过查询接口获取Topic对应的Broker的地址信息和Topic-Queue的路由配置Broker
: 注册配置信息到NameServer, 实时更新Topic信息到NameServer
2.RocketMQ 消费模式
2.1 广播模式
一条消息被多个Consumer消费,即使这些Consumer属于同一个Consumer Group,消息也会被Consumer Group中的每一个Consumer都消费一次。
consumer.setMessageModel(MessageModel.BROADCASTING);
2.2 集群模式
一个Consumer Group中的所有Consumer平均分摊消费消息(组内负载均衡)
consumer.setMessageModel(MessageModel.CLUSTERING);
3. 基础架构
rocketMq使用轻量级的NameServer服务进行服务的协调和治理工作,NameServer多节点部署时相互独立互不干扰。每一个rocketMq服务节点(broker节点)启动时都会遍历配置的NameServer列表并建立长链接,broker节点每30秒向NameServer发送一次心跳信息、NameServer每10秒会检查一次连接的broker是否存活。消费者和生产者会随机选择一个NameServer建立长连接,通过定期轮训更新的方式获取最新的服务信息。架构简图如下:
-
NameServer:启动,监听端口,等待producer,consumer,broker连接上来
-
Broker:启动,与nameserver保持长链接,定期向nameserver发送心跳信息,包含broker的ip,端口,当前broker上topic的信息
-
producer:启动,随机选择一个NameServer建立长连接,拿到broker的信息,然后就可以给broker发送消息了
-
consumer:启动,随机选择一个NameServer建立长连接,拿到broker的信息,然后就可以建立通道,消费消息
3.1 Broker 的存储结构
RocketMQ 存储用的是本地文件存储系统,将所有topic的消息全部写入同一个文件中(commit log),这样保证了IO写入的绝对顺序性,最大限度利用IO系统顺序读写带来的优势提升写入速度。
由于消息混合存储在一起,需要将每个消费者组消费topic最后的偏移量记录下来。这个文件就是consumer queue(索引文件)。所以消息在写入commit log 文件的同时还需将偏移量信息写入consumer queue文件。在索引文件中会记录消息的物理位置、偏移量offse,消息size等,消费者消费时根据上述信息就可以从commit log文件中快速找到消息信息。
Broker 存储结构如下:
3.2 存储文件简介
- Commit log:消息存储文件,rocket Mq会对commit log文件进行分割(默认大小1GB),新文件以消息最后一条消息的偏移量命名。(比如 00000000000000000000 代表了第一个文件,第二个文件名就是 00000000001073741824,表明起始偏移量为 1073741824)
- Consumer queue:消息消费队列(也是个文件),可以根据消费者数量设置多个,一个Topic 下的某个 Queue,每个文件约 5.72M,由 30w 条数据组成;ConsumeQueue 存储的条目是固定大小,只会存储 8 字节的 commitlog 物理偏移量,4 字节的消息长度和 8 字节 Tag 的哈希值,固定 20 字节;消费者是先从 ConsumeQueue 来得到消息真实的物理地址,然后再去 CommitLog 获取消息
- IndexFile:索引文件,是额外提供查找消息的手段,通过 Key 或者时间区间来查询对应的消息
整个流程简介:
Producer 使用轮询的方式分别向每个 Queue 中发送消息。
Consumer 启动的时候会在 Topic,Consumer group 维度发生负载均衡,为每个客户端分配需要处理的 Queue。负载均衡过程中每个客户端都获取到全部的的 ConsumerID 和所有 Queue 并进行排序,每个客户端使用相同负责均衡算法,例如平均分配的算法,这样每个客户端都会计算出自己需要消费那些 Queue,每当 Consumer 增加或减少就会触发负载均衡,所以我们可以通过 RocketMQ 负载均衡机制实现动态扩容,提升客户端收发消息能力。客户端负责均衡为客户端分配好 Queue 后,客户端会不断向 Broker 拉取消息,在客户端进行消费。
这里有个小问题:
可以一直增加客户端的数量提升消费能力吗?当然不可以,因为 Queue 数量有限,客户端数量一旦达到 Queue 数量,再扩容新节点无法提升消费能力,因为会有节点分配不到 Queue 而无法消费。
3.3 Consumer 端的负载均衡机制
topic 在创建之处可以设置 comsumer queue数量。而 comsumer 在启动时会和comsumer queue绑定,这个绑定策略是咋样的?
- 默认策略:
- queue 个数大于 Consumer个数, 那么 Consumer 会平均分配 queue,不够平均,会根据clientId排序来拿取余数
- queue个数小于Consumer个数,那么会有Consumer闲置,就是浪费掉了,其余Consumer平均分配到queue
- 一致性hash算法
- 就近元则,离的近的消费
- 每个消费者依次消费一个queue,环状
- 自定义方式
天然弊端:
RocketMQ 采用一个 consumer 绑定一个或者多个 Queue 模式,假如某个消费者服务器挂了,则会造成部分Queue消息堆积
3.4 消息刷盘机制
- 同步刷盘:当消息持久化完成后,Broker才会返回给Producer一个ACK响应,可以保证消息的可靠性,但是性能较低。
- 异步刷盘:只要消息写入PageCache即可将成功的ACK返回给Producer端。消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟,提高了RocketMQ的性能和吞吐量。
3.5 Mmap + pageCache
RocketMQ 底层对 commitLog、consumeQueue 之类的磁盘文件的读写操作都采用了 mmap 技术。
3.5.1 传统缓存 IO 和 Mmap
传统缓存 IO:
传统 I/O 的工作方式是,数据读取和写入是从用户空间到内核空间来回复制,而内核空间的数据是通过操作系统层面的 I/O 接口从磁盘读取或写入。
传统IO发生了 4 次用户态与内核态的上下文切换,因为发生了两次系统调用,一次是 read()
,一次是 write()
,每次系统调用都得先从用户态切换到内核态,等内核完成任务后,再从内核态切换回用户态。
其次,还发生了 4 次数据拷贝,其中两次是 DMA 的拷贝,另外两次则是通过 CPU 拷贝的
传统IO,write() 过程是怎样?
wirte() 写请求 和 read(),需要先写入用户缓存区,然后通过系统调用,CPU 拷贝数据从用户缓存区到内核缓存区,再从内核缓存区拷贝到磁盘文件!
简述上述过程:
- 第一次拷贝,把磁盘上的数据拷贝到操作系统内核的缓冲区里,这个拷贝的过程是通过 DMA 搬运的
- 第二次拷贝,把内核缓冲区的数据拷贝到用户的缓冲区里,于是我们应用程序就可以使用这部分数据了,这个拷贝到过程是由 CPU 完成的(用户态不能直接操作内核态缓存区,所以需要拷贝到用户态才能使用)
- 第三次拷贝,把刚才拷贝到用户的缓冲区里的数据,再拷贝到内核的 socket 的缓冲区里,这个过程依然还是由 CPU 搬运的
- 第四次拷贝,把内核的 socket 缓冲区里的数据,拷贝到网卡的缓冲区里,这个过程又是由 DMA 搬运的
因为文件传输的应用场景中,在用户空间我们并不会对数据「再加工」,所以数据实际上可以不用搬运到用户空间,因此用户的缓冲区是没有必要存在的。
Mmap(内存映射):
read()
系统调用的过程中会把内核缓冲区的数据拷贝到用户的缓冲区里,于是为了减少这一步开销,我们可以用 mmap()
替换 read()
系统调用函数。
mmap()
系统调用函数会把文件磁盘地址「映射」到内核缓存区(page cache),而内核缓存区会 「映射」到用户空间(虚拟地址)。这样,操作系统内核与用户空间就不需要再进行任何的数据拷贝。
注意,这里用户空间(虚拟地址)不是直接映射到文件磁盘地址,而是文件对应的 page cache,用户虚拟地址一般是和用户内存地址「映射」的,如果使用内存映射技术,则用户虚拟地址可以和内核内存地址「映射」。
根据维基百科给出的定义:在大多数操作系统中,映射的内存区域实际上是内核的page cache,这意味着不需要在用户空间创建副本。多个进程之间也可以通过同时映射 page cache,来进行进程通信)
mmap() 函数简介:
void * mmap(void *start, size_t length, int prot , int flags, int fd, off_t offset)
- start:要映射到的内存区域的起始地址,通常都是用NULL(NULL即为0)。NULL表示由内核来指定该内存地址
- offset:以文件开始处的偏移量, 必须是分页大小的整数倍, 通常为0, 表示从文件头开始映射
- length:将文件的多大长度映射到内存(每次创建新 commitlog 会默认指定长度 1GB)
- prot: 映射区的保护方式(PROT_EXEC: 映射区可被执行、PROT_READ: 映射区可被读取、PROT_WRITE: 映射区可被写入、PROT_NONE: 映射区不能存取)
- flags: 映射区的特性
- fd:文件描述符(由open函数返回)
从磁盘拷贝到内核空间的页缓存 (page cache),然后将用户空间的虚拟地址映射到内核的page cache,这样不需要再将页面从内核空间拷贝到用户空间了。
简述上述过程:
- 应用进程调用了 mmap() 后,DMA 会把磁盘的数据拷贝到内核的缓冲区里。接着,应用进程跟操作系统内核「共享」这个缓冲区;
- 应用进程再调用 write(),操作系统直接将内核缓冲区的数据拷贝到 socket 缓冲区中,这一切都发生在内核态,由 CPU 来搬运数据;
- 应用进程再调用 write(),操作系统直接将内核缓冲区的数据拷贝到 socket 缓冲区中,这一切都发生在内核态,由 CPU 来搬运数据;
最后,把内核的 socket 缓冲区里的数据,拷贝到网卡的缓冲区里,这个过程是由 DMA 搬运的。
使用 mmap() 写数据到磁盘文件会怎样?
mmap() 将用户虚拟地址映射内核缓存区(内存物理地址)后,写数据直接将数据写入内核缓存区,只需要经过一次CPU拷贝,将数据从内核缓存区拷贝到磁盘文件;比传统 IO 的 write() 操作少了一次数据拷贝的过程!
3.5.2 pageCache
在传统IO过程中,其中第一步都是先需要先把磁盘文件数据拷贝「内核缓冲区」里,这个「内核缓冲区」实际上是磁盘高速缓存(PageCache)。
3.5.3 预映射机制 + 文件预热机制
接着给大家说几个Broker针对上述的磁盘文件高性能读写机制做的一些优化:
-
内存预映射机制:Broker 会针对磁盘上的各种 CommitLog、ConsumeQueue 文件预先分配好MappedFile,也就是提前对一些可能接下来要读写的磁盘文件,提前使用 MappedByteBuffer 执行 mmap() 函数完成内存映射,这样后续读写文件的时候,就可以直接执行了(减少一次 CPU 拷贝)。
-
文件预热:在提前对一些文件完成内存映射之后,因为内存映射不会直接将数据从磁盘加载到内存里来,那么后续在读,取尤其是 CommitLog、ConsumeQueue 文件时候,其实有可能会频繁的从磁盘里加载数据到内存中去。所以,在执行完 mmap() 函数之后,还会进行 madvise() 系统调用,就是提前尽可能将磁盘文件加载到内存里去。(读磁盘 -> 读内存)
3.6 push/pull/pop
// TODO
3.7 Topic 分片
为了突破单个机器容量上限和单个机器读写性能,RocketMQ 支持 topic 数据分片
架构图如下:
3.8 查漏补缺
3.8.1 消息的全局顺序和局部顺序
- 全局顺序:一个 Topic 一个队列,Producer 和 Consuemr 的并发都为一。
- 局部顺序:某个队列消息是顺序的
3.8.2 零拷贝(Zero-copy)和mmap
详细请看这篇文章: 零拷贝(Zero-copy)和mmap
RocketMQ——角色与术语详解
原文地址:http://jaskey.github.io/blog/2016/12/15/rocketmq-concept/
RocketMQ——角色与术语详解
2016-12-15 THU 15:48
RocketMQ中有很多概念,其中包括一些术语和角色。
理清楚基本的概念能有效的帮助理解RocketMQ的原理以及排查问题。
角色:
Producer
生产者。发送消息的客户端角色。发送消息的时候需要指定Topic。
Consumer
消费者。消费消息的客户端角色。通常是后台处理异步消费的系统。 RocketMQ中Consumer有两种实现:PushConsumer和PullConsumer。
PushConsumer
推送模式(虽然RocketMQ使用的是长轮询)的消费者。消息的能及时被消费。使用非常简单,内部已处理如线程池消费、流控、负载均衡、异常处理等等的各种场景。
PullConsumer
拉取模式的消费者。应用主动控制拉取的时机,怎么拉取,怎么消费等。主动权更高。但要自己处理各种场景。
概念术语
Producer Group
标识发送同一类消息的Producer,通常发送逻辑一致。发送普通消息的时候,仅标识使用,并无特别用处。若事务消息,如果某条发送某条消息的producer-A宕机,使得事务消息一直处于PREPARED状态并超时,则broker会回查同一个group的其 他producer,确认这条消息应该commit还是rollback。但开源版本并不支持事务消息。
Consumer Group
标识一类Consumer的集合名称,这类Consumer通常消费一类消息,且消费逻辑一致。同一个Consumer Group下的各个实例将共同消费topic的消息,起到负载均衡的作用。
消费进度以Consumer Group为粒度管理,不同Consumer Group之间消费进度彼此不受影响,即消息A被Consumer Group1消费过,也会再给Consumer Group2消费。
注: RocketMQ要求同一个Consumer Group的消费者必须要拥有相同的注册信息,即必须要听一样的topic(并且tag也一样)。
Topic
标识一类消息的逻辑名字,消息的逻辑管理单位。无论消息生产还是消费,都需要指定Topic。
Tag
RocketMQ支持给在发送的时候给topic打tag,同一个topic的消息虽然逻辑管理是一样的。但是消费topic1的时候,如果你订阅的时候指定的是tagA,那么tagB的消息将不会投递。
Message Queue
简称Queue或Q。消息物理管理单位。一个Topic将有若干个Q。若Topic同时创建在不通的Broker,则不同的broker上都有若干Q,消息将物理地存储落在不同Broker结点上,具有水平扩展的能力。
无论生产者还是消费者,实际的生产和消费都是针对Q级别。例如Producer发送消息的时候,会预先选择(默认轮询)好该Topic下面的某一条Q地发送;Consumer消费的时候也会负载均衡地分配若干个Q,只拉取对应Q的消息。
每一条message queue均对应一个文件,这个文件存储了实际消息的索引信息。并且即使文件被删除,也能通过实际纯粹的消息文件(commit log)恢复回来。
Offset
RocketMQ中,有很多offset的概念。但通常我们只关心暴露到客户端的offset。一般我们不特指的话,就是指逻辑Message Queue下面的offset。
注: 逻辑offset的概念在RocketMQ中字面意思实际上和真正的意思有一定差别,这点在设计上显得有点混乱。祥见下面的解释。
可以认为一条逻辑的message queue是无限长的数组。一条消息进来下标就会涨1,而这个数组的下标就是offset。
max offset
字面上可以理解为这是标识message queue中的max offset表示消息的最大offset。但是从源码上看,这个offset实际上是最新消息的offset+1,即:下一条消息的offset。
min offset:
标识现存在的最小offset。而由于消息存储一段时间后,消费会被物理地从磁盘删除,message queue的min offset也就对应增长。这意味着比min offset要小的那些消息已经不在broker上了,无法被消费。
consumer offset
字面上,可以理解为标记Consumer Group在一条逻辑Message Queue上,消息消费到哪里即消费进度。但从源码上看,这个数值是消费过的最新消费的消息offset+1,即实际上表示的是下次拉取的offset位置。
消费者拉取消息的时候需要指定offset,broker不主动推送消息, offset的消息返回给客户端。
consumer刚启动的时候会获取持久化的consumer offset,用以决定从哪里开始消费,consumer以此发起第一次请求。
每次消息消费成功后,这个offset在会先更新到内存,而后定时持久化。在集群消费模式下,会同步持久化到broker,而在广播模式下,则会持久化到本地文件。
集群消费
消费者的一种消费模式。一个Consumer Group中的各个Consumer实例分摊去消费消息,即一条消息只会投递到一个Consumer Group下面的一个实例。
实际上,每个Consumer是平均分摊Message Queue的去做拉取消费。例如某个Topic有3条Q,其中一个Consumer Group 有 3 个实例(可能是 3 个进程,或者 3 台机器),那么每个实例只消费其中的1条Q。
而由Producer发送消息的时候是轮询所有的Q,所以消息会平均散落在不同的Q上,可以认为Q上的消息是平均的。那么实例也就平均地消费消息了。
这种模式下,消费进度的存储会持久化到Broker。
广播消费
消费者的一种消费模式。消息将对一个Consumer Group下的各个Consumer实例都投递一遍。即即使这些 Consumer 属于同一个Consumer Group,消息也会被Consumer Group 中的每个Consumer都消费一次。
实际上,是一个消费组下的每个消费者实例都获取到了topic下面的每个Message Queue去拉取消费。所以消息会投递到每个消费者实例。
这种模式下,消费进度会存储持久化到实例本地。
顺序消息
消费消息的顺序要同发送消息的顺序一致。由于Consumer消费消息的时候是针对Message Queue顺序拉取并开始消费,且一条Message Queue只会给一个消费者(集群模式下),所以能够保证同一个消费者实例对于Q上消息的消费是顺序地开始消费(不一定顺序消费完成,因为消费可能并行)。
在RocketMQ中,顺序消费主要指的是都是Queue级别的局部顺序。这一类消息为满足顺序性,必须Producer单线程顺序发送,且发送到同一个队列,这样Consumer就可以按照Producer发送的顺序去消费消息。
生产者发送的时候可以用MessageQueueSelector为某一批消息(通常是有相同的唯一标示id)选择同一个Queue,则这一批消息的消费将是顺序消息(并由同一个consumer完成消息)。或者Message Queue的数量只有1,但这样消费的实例只能有一个,多出来的实例都会空跑。
普通顺序消息
顺序消息的一种,正常情况下可以保证完全的顺序消息,但是一旦发生异常,Broker宕机或重启,由于队列总数发生发化,消费者会触发负载均衡,而默认地负载均衡算法采取哈希取模平均,这样负载均衡分配到定位的队列会发化,使得队列可能分配到别的实例上,则会短暂地出现消息顺序不一致。
如果业务能容忍在集群异常情况(如某个 Broker 宕机或者重启)下,消息短暂的乱序,使用普通顺序方式比较合适。
严格顺序消息
顺序消息的一种,无论正常异常情况都能保证顺序,但是牺牲了分布式 Failover 特性,即 Broker集群中只要有一台机器不可用,则整个集群都不可用,服务可用性大大降低。
如果服务器部署为同步双写模式,此缺陷可通过备机自动切换为主避免,不过仍然会存在几分钟的服务不可用。(依赖同步双写,主备自动切换,自动切换功能目前并未实现)
目前已知的应用只有数据库 binlog 同步强依赖严格顺序消息,其他应用绝大部分都可以容忍短暂乱序,推荐使用普通的顺序消息
以上是关于RocketMQ 详解的主要内容,如果未能解决你的问题,请参考以下文章
RocketMQ 详解系列
详解文件IO系列讲讲 MQ 消息中间件 (Kafka,RocketMQ等)与 MMAPPageCache 的故事...
消息中间件 - RocketMQ 详解(从软件安装到案例实现)
消息中间件 - RocketMQ 详解(从软件安装到案例实现)
消息中间件 - RocketMQ 详解(从软件安装到案例实现)
rocketmq详解