rocketmq源码心得
Posted 路飞beat
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rocketmq源码心得相关的知识,希望对你有一定的参考价值。
对于任意一个消息中间件来说,消息都需要经过消息生产->消息存储->消息消费三个过程。其中消息生产包括了Producer端的消息构造和消息发送,消息存储包括了Broker端的消息接收和消息落地,消息消费包括了Consumer端的消息接收和消息处理。不同消息中间件之所以功能不同,性能差异较大,其主要原因就是消息生命周期中各个阶段的处理方式不同。
消息生产:
消息的生产者Producer在发送消息时,会从Name Server上获取消息的目的地(Topic)在各个Broker上的状态,如果发现同一个Broker下的Topic有多个Queue(队列),则会根据RoundBin算法依次向每个Queue发送消息,此外,如果发现多个Broker上均有相同Topic,也会依照轮询的方式依次向这些Broker发送消息。
消息存储:
存储层的结构如下图所示,业务层均通过DefaultMessageStore类提供的方法作为统一入口访问底层文件。RocketMQ底层有6类文件,对于不同类型的文件在存储逻辑层采用不同的类提供服务,其中三类大文件:Index文件由IndexService类提供服务,consumequeue文件由Consumequeue类提供服务,commitlog文件由CommitLog类提供服务,还有两类小文件:checkpoint文件由StoreCheckPoint类提供访问服务、config目录下面的配置文件(以json格式存储)由ConfigMananger类提供访问服务;
对于index/consumequeue/commitlog这三类大文件,为了提供读写性能,底层采用Java.nio.MappedByteBuffer类,该类是文件内存映射方案,支持随机读/顺序写操作,为了便于存储逻辑层操作大文件,将该类封装成MapedFile类;对于每类大文件,在存储时分隔成多个固定大小的文件,其中每分隔的文件的文件名为前面所有文件的大小+1,即为文件的起始偏移量,从而实现了整个大文件的串联,每个固定大小的文件均由MapedFile类提供操作服务;MapedFile类提供了顺序写、随机读、内存数据刷盘、内存清理等与文件相关的服务。
消息刷盘操作(commit)
主要功能是将内存中的消息写入磁盘文件中。方法是commit(final int flushLeastPages)。主要逻辑:
1)检查文件是否写满了,即写入位置(wrotePostion)是否等于文件大小(fileSize),若已经写满则进行刷盘操作;
2)检查内存中未刷的消息页数是否大于最小刷盘页数,即要刷盘的消息页数flushLeastPages(每页默认大小为4K)是否大于【committedPosition(上次刷盘的位置)减去wrotePostion】/4K;若不够页数也暂不刷盘;
3)MapedFile的父类是ReferenceResource,该父类的作用是记录该MapedFile中的内存对象被引用的次数;该引用次数为正数表示资源可用即未被shutdown,当在刷盘之前将引用次数加1后为正数,则调用BtyeBuffer的force方法进行刷盘,再将committedPosition置为wrotePostion值;最后将引用次数减1;若引起次数为不为正数则将直接committedPosition置为wrotePostion值
每个queue都有一个对应consumequeue文件
consumequeue中存放的是一串20字节定长的二进制数据,顺序读顺序写
commitlog是消息存放的物理文件,每台broker上的commitLog被本机所有的queue共享,不做区分,消息存储单元长度不固定,文件顺序写,随机读,简而言之,broker端接收一条消息后,如果消息需要落盘,则会在commitlog写入整条消息,并在consumequeue写入该消息的索引信息,消息被消费时,根据consumequeue中的信息去commitlog去获取信息,rocketmq在消息被消费
后,并不会去conmmitlog中删除数据,而是保存三天(可以进行配置)而且批量删除。rocketmq支持同步刷盘和异步刷盘,同步值producer将消息送至broker后,等待消息存入commitlog和consumequeue后,才算发送成功,异步刷盘则是将消息发送至broker后,broker将消息放入内存则告知producer消息发送成功,而后由broker自行将内存中的消息批量刷入磁盘;
consumer端的读取消息是首先找到对应的consumequeue再跟进commitlog的偏移量找到commitlog中的消息
1.根据topic和queueId来组织文件,图中TopicA有两个队列0,1,那么TopicA和QueueId=0组成一个ConsumeQueue,TopicA和QueueId=1组成另一个ConsumeQueue。
2.按照消费端的GroupName来分组重试队列,如果消费端消费失败,消息将被发往重试队列中,比如图中的%RETRY%ConsumerGroupA。
3.按照消费端的GroupName来分组死信队列,如果消费端消费失败,并重试指定次数后,仍然失败,则发往死信队列,比如图中的%DLQ%ConsumerGroupA。
consumerQueue结构如下
CommitLog Offset是指这条消息在Commit Log文件中的实际偏移量
Size存储中消息的大小
Message Tag HashCode存储消息的Tag的哈希值:主要用于订阅时消息过滤(订阅时如果指定了Tag,会根据HashCode来快速查找到订阅的消息)
Commit Log
CommitLog:消息存放的物理文件,每台broker
上的commitlog
被本机所有的queue
共享,不做任何区分。
CommitLog的消息存储单元长度不固定,文件顺序写,随机读。
三.消息消费:
RocketMQ消息订阅有两种模式,一种是Push模式,即Broker主动向消费端推送;另外一种是Pull模式,即消费端在需要时,主动到Broker拉取。但在具体实现时,Push和Pull模式都是采用消费端主动拉取的方式。
Consumer端每隔一段时间主动向broker发送拉消息请求,broker在收到Pull请求后,如果有消息就立即返回数据,Consumer端收到返回的消息后,再回调消费者设置的Listener方法。如果broker在收到Pull请求时,消息队列里没有数据,broker端会阻塞请求直到有数据传递或超时才返回。
与Kafka类似,Kafka中Consumer数量不能大于Partition数量,而在RocketMQ中Consumer的数量也不能大于队列(Queue)的数量,如果Consumer超过队列数量,那么多余的Consumer将不能消费消息。可以简单理解为queue与consumer的关系是多对一的关系。
顺序消息如何实现
producer端将同一个订单的消息顺序发送到同一个队列
new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
return mqs.get(id);
}
}
consumer端单线程顺序消费
consumer.registerMessageListener(new MessageListenerOrderly() {
Random random = new Random();
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
System.out.print(Thread.currentThread().getName() + " Receive New Messages: " );
for (MessageExt msg: msgs) {
System.out.println(msg + ", content:" + new String(msg.getBody()));
}
try {
//模拟业务逻辑处理中...
TimeUnit.SECONDS.sleep(random.nextInt(10));
} catch (Exception e) {
e.printStackTrace();
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
事务型消息TransactionProducer类例子
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
/**
* [transactionCheckListener 监听处理 当RocketMQ发现`Prepared消息`时,会根据这个Listener实现的策略来决断事务]
* @type {TransactionCheckListenerImpl}
*/
TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
producer.setCheckThreadPoolMinSize(2);
producer.setCheckThreadPoolMaxSize(2);
producer.setCheckRequestHoldMax(2000);
producer.setTransactionCheckListener(transactionCheckListener);
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
/**
* [tranExecuter 本地事务的处理逻辑也就是发送方的业务处理]
* @type {TransactionExecuterImpl}
*/
TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();
for (int i = 0; i < 100; i++) {
try {
Message msg =
new Message("TopicTest", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
/**
* [sendResult 消息发送]
* @type {[type]}
*/
SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}
接下来看下sendMessageInTransaction方法;
public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter tranExecuter, final Object arg)
throws MQClientException {
。。。
switch (sendResult.getSendStatus()) {
case SEND_OK: {
try {
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
/**
* [localTransactionState 如果消息发送成功,处理与消息关联的本地事务单元]
* @type {[type]}
*/
localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}
if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return {}", localTransactionState);
log.info(msg.toString());
}
} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}
try {
/**
* 事务结束
*/
this.endTransaction(sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
。。。
}
以上是关于rocketmq源码心得的主要内容,如果未能解决你的问题,请参考以下文章