RocketMQ 消费者监听模型 解析——图解源码级解析

Posted 小王曾是少年

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ 消费者监听模型 解析——图解源码级解析相关的知识,希望对你有一定的参考价值。

RocketMQ 存储优化技术 解析——图解源码级解析

🍊 Java学习:Java从入门到精通总结

🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想

🍊 绝对不一样的职场干货:大厂最佳实践经验指南


📆 最近更新:2023年1月13日

🍊 个人简介:通信工程本硕 for NJU💪、Java程序员🌕。做过科研paper,发过专利,优秀的程序员不应该只是CRUD

🍊 点赞 👍 收藏 ⭐留言 📝 都是我最大的动力!


文章目录

RocketMQ里的存储优化

内存预分配

写入消息时,CommitLog会先从MappedFileQueue(队列)中获取一个MappedFileMappedFile对象的预分配过程如下图所示:

MappedFile的创建过程是将构建好的AllocateRequest请求添加至队列中,后台运行的AllocateMappedFileService服务线程根据队列里存在的请求,执行MappedFile映射文件的创建和预分配工作。

 static class AllocateRequest implements Comparable<AllocateMappedFileService.AllocateRequest> 
        private String filePath;
        private int fileSize;
        private CountDownLatch countDownLatch = new CountDownLatch(1);
        private volatile MappedFile mappedFile = null;

分配时有两种策略:

  1. 使用mmap的方式创建MappedFile实例
  2. TransientStorePool堆外内存池中获取相应的DirectByteBuffer来构建MappedFile实例

在创建并分配完成MappedFile实例后,系统还会同时将下一个MappedFile实例也预先创建出来并保存到请求队列里,下次请求时可以跳过创建MappedFile实例的时间,直接返回。


mlock系统调用

该系统调用的功能是可以将进程使用的部分或所有的地址空间锁定在物理内存里,防止其被交换到swap空间。

对于一款消息中间件来说,追求的一定是消息读写的低延时,因为内存页面调出调入的时间延迟可能太长或难以预知,所以就希望尽可能多地使用物理内存,以提高数据读写的效率。


文件预热

做预热主要是基于如下考虑:

  1. 仅仅分配内存并执行mlock系统调用之后并不会为程序完全锁定这些内存,其中的分页仍然可能是copy-on-write的,因此RocketMQ在创建MappedFile实例的时候,会先写入一些随机值到mmap映射出的内存空间里。

  1. 使用mmap进行内存映射之后,操作系统只是建立虚拟内存地址到物理地址的映射表,但没有加载任何文件至内存中。程序想要访问数据时,操作系统会检查该部分的分页是否已经在内存里,如果不在的话,则会发出一次缺页中断。RocketMQ在做mmap内存映射的同时进行madvise系统调用,目的是使操作系统做一次内存映射后对应的文件数据尽可能多地预加载进内存,从而实现预热。

x86 Linux中的一个标准页面大小为4KB,1G的commitLog需要发生256次缺页中断,才能将数据完全加载进物理内存中


存储模型

1. CommitLog

消息主体及元数据的存储主体,存储Producer端写入的消息主体内容。单个文件默认大小为1GB,文件名长度为20位

00000000000000000000表示第一个文件,起始偏移量为0,文件大小为1GB = 1073741824;00000000001073741824表示第二个文件,写入是顺序写

public class CommitLog 
    // Message's MAGIC CODE daa320a7
    public final static int MESSAGE_MAGIC_CODE = -626843481;
    protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
    // End of file empty MAGIC CODE cbd43194
    protected final static int BLANK_MAGIC_CODE = -875286124;
    protected final MappedFileQueue mappedFileQueue;
    protected final DefaultMessageStore defaultMessageStore;
    private final FlushCommitLogService flushCommitLogService;

    //If TransientStorePool enabled, we must flush message to FileChannel at fixed periods
    private final FlushCommitLogService commitLogService;

    private final AppendMessageCallback appendMessageCallback;
    private final ThreadLocal<PutMessageThreadLocal> putMessageThreadLocal;
    protected HashMap<String/* topic-queueid */, Long/* offset */> topicQueueTable = new HashMap<String, Long>(1024);
    protected volatile long confirmOffset = -1L;

    private volatile long beginTimeInLock = 0;

    protected final PutMessageLock putMessageLock;

2. ConsumeQueue

消息消费的逻辑队列,其中包含了这个MessageQueueCommitLog中的起始物理位置偏移量offset、消息实体内容的大小和Message Tag的哈希值。从实际物理存储来说,ConsumeQueue对应每个TopicQueueId下面的所有文件,每个文件默认大小为6MB,差不多可以存储30万条消息,也是顺序写入。

public class ConsumeQueue 
    private static final InternalLogger log = InternalLoggerFactory.getLogger("RocketmqStore");
    public static final int CQ_STORE_UNIT_SIZE = 20;
    private static final InternalLogger LOG_ERROR = InternalLoggerFactory.getLogger("RocketmqStoreError");
    private final DefaultMessageStore defaultMessageStore;
    private final MappedFileQueue mappedFileQueue;
    private final String topic;
    private final int queueId;
    private final ByteBuffer byteBufferIndex;
    private final String storePath;
    private final int mappedFileSize;
    private long maxPhysicOffset = -1L;
    private volatile long minLogicOffset = 0L;
    private ConsumeQueueExt consumeQueueExt = null;

3. IndexFile

用于为生成的索引文件提供访问,通过消息Key值查询消息真正的实体内容。在实际的物理存储上,文件名是以创建的时间戳命名的,一个IndexFile可以保存2000万个索引。

public class IndexFile 
    private static final InternalLogger log = InternalLoggerFactory.getLogger("RocketmqStore");
    private static int hashSlotSize = 4;
    private static int indexSize = 20;
    private static int invalidIndex = 0;
    private final int hashSlotNum;
    private final int indexNum;
    private final MappedFile mappedFile;
    private final FileChannel fileChannel;
    private final MappedByteBuffer mappedByteBuffer;
    private final IndexHeader indexHeader;

4. MappedFileQueue

对连续物理存储的封装类,代码中可以通过消息存储的物理偏移量快速定位offset对应的MappedFile

public class MappedFileQueue 
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
    private static final InternalLogger LOG_ERROR = InternalLoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME);

    private static final int DELETE_FILES_BATCH_MAX = 10;

    private final String storePath;

    private final int mappedFileSize;

    private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();

    private final AllocateMappedFileService allocateMappedFileService;

    private long flushedWhere = 0;
    private long committedWhere = 0;

    private volatile long storeTimestamp = 0;

5. MappedFile

文件存储的直接内存映射封装类,通过该类的实例,可以把消息写入PageCache,或者将消息刷盘。

public class MappedFile extends ReferenceResource 
    public static final int OS_PAGE_SIZE = 1024 * 4;
    protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);

    private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);

    private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);
    protected final AtomicInteger wrotePosition = new AtomicInteger(0);
    protected final AtomicInteger committedPosition = new AtomicInteger(0);
    private final AtomicInteger flushedPosition = new AtomicInteger(0);
    protected int fileSize;
    protected FileChannel fileChannel;
    /**
     * Message will put to here first, and then reput to FileChannel if writeBuffer is not null.
     */
    protected ByteBuffer writeBuffer = null;
    protected TransientStorePool transientStorePool = null;
    private String fileName;
    private long fileFromOffset;
    private File file;
    private MappedByteBuffer mappedByteBuffer;
    private volatile long storeTimestamp = 0;
    private boolean firstCreateInQueue = false;


刷盘流程

同步刷盘


只有在消息成功写入磁盘之后,Broker才会给Producer返回一个ACK响应。RocketMQ中,主线程会首先创建一个刷盘请求实例GroupCommitRequest,并将其放到刷盘队列,之后使用同步刷盘线程GroupCommitService来执行刷盘动作。

GroupCommitService里使用到CountDownLatch来控制线程间同步

RocketMQ里还使用了两个队列分别负责读和写操作,实现读写分离,提高并发量

同步刷盘可以保障较好的一致性,一般适用于金融业务领域。


异步刷盘

只要消息写入到Pagecache之后就可以直接返回ACK给Producer,之后,后台异步线程负责将消息刷盘,此时主线程并不会阻塞,降低了读写延迟,从而达到提高MQ性能和吞吐量的目的。


以上是关于RocketMQ 消费者监听模型 解析——图解源码级解析的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ 存储优化技术 解析——图解源码级解析

RocketMQ 消息负载均衡策略解析——图解源码级解析

RocketMQ源码解析-消息消费

RocketMQ:消息ACK机制源码解析

深度挖掘RocketMQ底层源码「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(上篇)

精华推荐 | 深入浅出 RocketMQ原理及实战「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(上篇)