rocketmq源码心得

Posted 路飞beat

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rocketmq源码心得相关的知识,希望对你有一定的参考价值。


对于任意一个消息中间件来说,消息都需要经过消息生产->消息存储->消息消费三个过程。其中消息生产包括了Producer端的消息构造和消息发送,消息存储包括了Broker端的消息接收和消息落地,消息消费包括了Consumer端的消息接收和消息处理。不同消息中间件之所以功能不同,性能差异较大,其主要原因就是消息生命周期中各个阶段的处理方式不同。

消息生产:


消息的生产者Producer在发送消息时,会从Name Server上获取消息的目的地(Topic)在各个Broker上的状态,如果发现同一个Broker下的Topic有多个Queue(队列),则会根据RoundBin算法依次向每个Queue发送消息,此外,如果发现多个Broker上均有相同Topic,也会依照轮询的方式依次向这些Broker发送消息。


消息存储:


存储层的结构如下图所示,业务层均通过DefaultMessageStore类提供的方法作为统一入口访问底层文件。RocketMQ底层有6类文件,对于不同类型的文件在存储逻辑层采用不同的类提供服务,其中三类大文件:Index文件由IndexService类提供服务,consumequeue文件由Consumequeue类提供服务,commitlog文件由CommitLog类提供服务,还有两类小文件:checkpoint文件由StoreCheckPoint类提供访问服务、config目录下面的配置文件(以json格式存储)由ConfigMananger类提供访问服务; 

对于index/consumequeue/commitlog这三类大文件,为了提供读写性能,底层采用Java.nio.MappedByteBuffer类,该类是文件内存映射方案,支持随机读/顺序写操作,为了便于存储逻辑层操作大文件,将该类封装成MapedFile类;对于每类大文件,在存储时分隔成多个固定大小的文件,其中每分隔的文件的文件名为前面所有文件的大小+1,即为文件的起始偏移量,从而实现了整个大文件的串联,每个固定大小的文件均由MapedFile类提供操作服务;MapedFile类提供了顺序写、随机读、内存数据刷盘、内存清理等与文件相关的服务。




消息刷盘操作(commit) 

主要功能是将内存中的消息写入磁盘文件中。方法是commit(final int flushLeastPages)。主要逻辑: 

1)检查文件是否写满了,即写入位置(wrotePostion)是否等于文件大小(fileSize),若已经写满则进行刷盘操作; 

2)检查内存中未刷的消息页数是否大于最小刷盘页数,即要刷盘的消息页数flushLeastPages(每页默认大小为4K)是否大于【committedPosition(上次刷盘的位置)减去wrotePostion】/4K;若不够页数也暂不刷盘; 

3)MapedFile的父类是ReferenceResource,该父类的作用是记录该MapedFile中的内存对象被引用的次数;该引用次数为正数表示资源可用即未被shutdown,当在刷盘之前将引用次数加1后为正数,则调用BtyeBuffer的force方法进行刷盘,再将committedPosition置为wrotePostion值;最后将引用次数减1;若引起次数为不为正数则将直接committedPosition置为wrotePostion值



每个queue都有一个对应consumequeue文件

consumequeue中存放的是一串20字节定长的二进制数据,顺序读顺序写

commitlog是消息存放的物理文件,每台broker上的commitLog被本机所有的queue共享,不做区分,消息存储单元长度不固定,文件顺序写,随机读,简而言之,broker端接收一条消息后,如果消息需要落盘,则会在commitlog写入整条消息,并在consumequeue写入该消息的索引信息,消息被消费时,根据consumequeue中的信息去commitlog去获取信息,rocketmq在消息被消费

后,并不会去conmmitlog中删除数据,而是保存三天(可以进行配置)而且批量删除。rocketmq支持同步刷盘和异步刷盘,同步值producer将消息送至broker后,等待消息存入commitlog和consumequeue后,才算发送成功,异步刷盘则是将消息发送至broker后,broker将消息放入内存则告知producer消息发送成功,而后由broker自行将内存中的消息批量刷入磁盘;


consumer端的读取消息是首先找到对应的consumequeue再跟进commitlog的偏移量找到commitlog中的消息


1.根据topic和queueId来组织文件,图中TopicA有两个队列0,1,那么TopicA和QueueId=0组成一个ConsumeQueue,TopicA和QueueId=1组成另一个ConsumeQueue。

2.按照消费端的GroupName来分组重试队列,如果消费端消费失败,消息将被发往重试队列中,比如图中的%RETRY%ConsumerGroupA。

3.按照消费端的GroupName来分组死信队列,如果消费端消费失败,并重试指定次数后,仍然失败,则发往死信队列,比如图中的%DLQ%ConsumerGroupA。


consumerQueue结构如下

  1. CommitLog Offset是指这条消息在Commit Log文件中的实际偏移量

  2. Size存储中消息的大小

  3. Message Tag HashCode存储消息的Tag的哈希值:主要用于订阅时消息过滤(订阅时如果指定了Tag,会根据HashCode来快速查找到订阅的消息)


Commit Log

CommitLog:消息存放的物理文件,每台broker上的commitlog被本机所有的queue共享,不做任何区分。

CommitLog的消息存储单元长度不固定,文件顺序写,随机读。


三.消息消费:

RocketMQ消息订阅有两种模式,一种是Push模式,即Broker主动向消费端推送;另外一种是Pull模式,即消费端在需要时,主动到Broker拉取。但在具体实现时,Push和Pull模式都是采用消费端主动拉取的方式。


Consumer端每隔一段时间主动向broker发送拉消息请求,broker在收到Pull请求后,如果有消息就立即返回数据,Consumer端收到返回的消息后,再回调消费者设置的Listener方法。如果broker在收到Pull请求时,消息队列里没有数据,broker端会阻塞请求直到有数据传递或超时才返回。

与Kafka类似,Kafka中Consumer数量不能大于Partition数量,而在RocketMQ中Consumer的数量也不能大于队列(Queue)的数量,如果Consumer超过队列数量,那么多余的Consumer将不能消费消息。可以简单理解为queue与consumer的关系是多对一的关系。





顺序消息如何实现

producer端将同一个订单的消息顺序发送到同一个队列

new MessageQueueSelector() {

                    @Override

                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {

                        Integer id = (Integer) arg;

                        return mqs.get(id);

                    }

                }





consumer端单线程顺序消费


consumer.registerMessageListener(new MessageListenerOrderly() {

 

            Random random = new Random();

 

            @Override

            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {

                context.setAutoCommit(true);

                System.out.print(Thread.currentThread().getName() + " Receive New Messages: " );

                for (MessageExt msg: msgs) {

                    System.out.println(msg + ", content:" + new String(msg.getBody()));

                }

                try {

                    //模拟业务逻辑处理中...

                    TimeUnit.SECONDS.sleep(random.nextInt(10));

                } catch (Exception e) {

                    e.printStackTrace();

                }

                return ConsumeOrderlyStatus.SUCCESS;

            }

     

           });



事务型消息TransactionProducer类例子

public class TransactionProducer {

    public static void main(String[] args) throws MQClientException, InterruptedException {

        /**

         * [transactionCheckListener 监听处理 当RocketMQ发现`Prepared消息`时,会根据这个Listener实现的策略来决断事务]

         * @type {TransactionCheckListenerImpl}

         */

        TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();

        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");

        producer.setCheckThreadPoolMinSize(2);

        producer.setCheckThreadPoolMaxSize(2);

        producer.setCheckRequestHoldMax(2000);

        producer.setTransactionCheckListener(transactionCheckListener);

        producer.start();


        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};

        /**

         * [tranExecuter 本地事务的处理逻辑也就是发送方的业务处理]

         * @type {TransactionExecuterImpl}

         */

        TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();

        for (int i = 0; i < 100; i++) {

            try {

                Message msg =

                    new Message("TopicTest", tags[i % tags.length], "KEY" + i,

                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));

                /**

                 * [sendResult 消息发送]

                 * @type {[type]}

                 */

                SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);

                System.out.printf("%s%n", sendResult);


                Thread.sleep(10);

            } catch (MQClientException | UnsupportedEncodingException e) {

                e.printStackTrace();

            }

        }


        for (int i = 0; i < 100000; i++) {

            Thread.sleep(1000);

        }

        producer.shutdown();

    }

}

接下来看下sendMessageInTransaction方法;

public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter tranExecuter, final Object arg)

        throws MQClientException {

        。。。

        switch (sendResult.getSendStatus()) {

            case SEND_OK: {

                try {

                    if (sendResult.getTransactionId() != null) {

                        msg.putUserProperty("__transactionId__", sendResult.getTransactionId());

                    }

                    /**

                     * [localTransactionState 如果消息发送成功,处理与消息关联的本地事务单元]

                     * @type {[type]}

                     */

                    localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);

                    if (null == localTransactionState) {

                        localTransactionState = LocalTransactionState.UNKNOW;

                    }


                    if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {

                        log.info("executeLocalTransactionBranch return {}", localTransactionState);

                        log.info(msg.toString());

                    }

                } catch (Throwable e) {

                    log.info("executeLocalTransactionBranch exception", e);

                    log.info(msg.toString());

                    localException = e;

                }

            }

            break;

            case FLUSH_DISK_TIMEOUT:

            case FLUSH_SLAVE_TIMEOUT:

            case SLAVE_NOT_AVAILABLE:

                localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;

                break;

            default:

                break;

        }


        try {

            /**

             * 事务结束

             */

            this.endTransaction(sendResult, localTransactionState, localException);

        } catch (Exception e) {

            log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);

        }


        。。。

    }



以上是关于rocketmq源码心得的主要内容,如果未能解决你的问题,请参考以下文章

3RocketMQ 源码解析之 源代码环境搭建

3RocketMQ 源码解析之 源代码环境搭建

3RocketMQ 源码解析之 源代码环境搭建

RocketMQ源码分析之从官方示例窥探:RocketMQ事务消息实现基本思想

原来RocketMQ中间件可以这么玩!面试心得体会

eclipse获取RocketMQ源码