RocketMQ源码系列(二十) 设计思想篇
Posted Dream_it_possible!
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ源码系列(二十) 设计思想篇相关的知识,希望对你有一定的参考价值。
rocketmq 版本: 4.8.0
单一职责
单一原则是架构设计中非常重要的一个特性,大到模块,小到类,尽量能做到只负责自己职责范围之内的事情,尽最大限度减少模块之间的耦合,简称高内聚、低耦合。
模块
每个模块的职责负责的范围基本实现单一,例如日志模块 logging, logging模块只负责日志功能,对外模块提供日志服务,在类里只需要加上一行代码就能享受到日志的功能。
private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
类
例如NameSrv模块的KVConfigManager类,该类是NameSrv的基础的配置管理器, 因此他只负责加载配置、更新配置、删除配置、根据namespace和key来获取value等。
统一封装request和response
rocketmq的通信是基于netty实现的,他统一采用RemotingComand的实现类来封装request和response, 每个请求和响应都可以看成一个动作, 所有请求和响应都可以包装成RmotingCommand。
例如,创建一个注册broker的请求:
RemotingCommand request =
RemotingCommand.createRequestCommand(reg ? RequestCode.REGISTER_BROKER : RequestCode.UNREGISTER_BROKER, requestHeader);
创建一个注册broker完成后的响应:
final RemotingCommand response =
RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
请求和响应的体的封装均有实现类去实现,这样做的好处是实现统一处理请求,只需要根据不同的请求Code去处理不同的业务逻辑,响应也根据code来进行响应。
看一下有哪些responseCode
package org.apache.rocketmq.remoting.protocol;
public class RemotingSysResponseCode {
public static final int SUCCESS = 0;
public static final int SYSTEM_ERROR = 1;
public static final int SYSTEM_BUSY = 2;
public static final int REQUEST_CODE_NOT_SUPPORTED = 3;
public static final int TRANSACTION_FAILED = 4;
}
接收到netty的请求后,统一处理请求proceessRequest
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.UPDATE_AND_CREATE_TOPIC:
return this.updateAndCreateTopic(ctx, request);
case RequestCode.DELETE_TOPIC_IN_BROKER:
return this.deleteTopic(ctx, request);
case RequestCode.GET_ALL_TOPIC_CONFIG:
return this.getAllTopicConfig(ctx, request);
case RequestCode.UPDATE_BROKER_CONFIG:
return this.updateBrokerConfig(ctx, request);
case RequestCode.GET_BROKER_CONFIG:
return this.getBrokerConfig(ctx, request);
case RequestCode.SEARCH_OFFSET_BY_TIMESTAMP:
return this.searchOffsetByTimestamp(ctx, request);
case RequestCode.GET_MAX_OFFSET:
return this.getMaxOffset(ctx, request);
case RequestCode.GET_MIN_OFFSET:
return this.getMinOffset(ctx, request);
case RequestCode.GET_EARLIEST_MSG_STORETIME:
return this.getEarliestMsgStoretime(ctx, request);
case RequestCode.GET_BROKER_RUNTIME_INFO:
return this.getBrokerRuntimeInfo(ctx, request);
case RequestCode.LOCK_BATCH_MQ:
return this.lockBatchMQ(ctx, request);
case RequestCode.UNLOCK_BATCH_MQ:
return this.unlockBatchMQ(ctx, request);
case RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP:
return this.updateAndCreateSubscriptionGroup(ctx, request);
case RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG:
return this.getAllSubscriptionGroup(ctx, request);
case RequestCode.DELETE_SUBSCRIPTIONGROUP:
return this.deleteSubscriptionGroup(ctx, request);
case RequestCode.GET_TOPIC_STATS_INFO:
return this.getTopicStatsInfo(ctx, request);
case RequestCode.GET_CONSUMER_CONNECTION_LIST:
return this.getConsumerConnectionList(ctx, request);
case RequestCode.GET_PRODUCER_CONNECTION_LIST:
return this.getProducerConnectionList(ctx, request);
case RequestCode.GET_CONSUME_STATS:
return this.getConsumeStats(ctx, request);
case RequestCode.GET_ALL_CONSUMER_OFFSET:
return this.getAllConsumerOffset(ctx, request);
case RequestCode.GET_ALL_DELAY_OFFSET:
return this.getAllDelayOffset(ctx, request);
case RequestCode.INVOKE_BROKER_TO_RESET_OFFSET:
return this.resetOffset(ctx, request);
case RequestCode.INVOKE_BROKER_TO_GET_CONSUMER_STATUS:
return this.getConsumerStatus(ctx, request);
case RequestCode.QUERY_TOPIC_CONSUME_BY_WHO:
return this.queryTopicConsumeByWho(ctx, request);
case RequestCode.REGISTER_FILTER_SERVER:
return this.registerFilterServer(ctx, request);
case RequestCode.QUERY_CONSUME_TIME_SPAN:
return this.queryConsumeTimeSpan(ctx, request);
case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_BROKER:
return this.getSystemTopicListFromBroker(ctx, request);
case RequestCode.CLEAN_EXPIRED_CONSUMEQUEUE:
return this.cleanExpiredConsumeQueue();
case RequestCode.CLEAN_UNUSED_TOPIC:
return this.cleanUnusedTopic();
case RequestCode.GET_CONSUMER_RUNNING_INFO:
return this.getConsumerRunningInfo(ctx, request);
case RequestCode.QUERY_CORRECTION_OFFSET:
return this.queryCorrectionOffset(ctx, request);
case RequestCode.CONSUME_MESSAGE_DIRECTLY:
return this.consumeMessageDirectly(ctx, request);
case RequestCode.CLONE_GROUP_OFFSET:
return this.cloneGroupOffset(ctx, request);
case RequestCode.VIEW_BROKER_STATS_DATA:
return ViewBrokerStatsData(ctx, request);
case RequestCode.GET_BROKER_CONSUME_STATS:
return fetchAllConsumeStatsInBroker(ctx, request);
case RequestCode.QUERY_CONSUME_QUEUE:
return queryConsumeQueue(ctx, request);
case RequestCode.UPDATE_AND_CREATE_ACL_CONFIG:
return updateAndCreateAccessConfig(ctx, request);
case RequestCode.DELETE_ACL_CONFIG:
return deleteAccessConfig(ctx, request);
case RequestCode.GET_BROKER_CLUSTER_ACL_INFO:
return getBrokerAclConfigVersion(ctx, request);
case RequestCode.UPDATE_GLOBAL_WHITE_ADDRS_CONFIG:
return updateGlobalWhiteAddrsConfig(ctx, request);
case RequestCode.RESUME_CHECK_HALF_MESSAGE:
return resumeCheckHalfMessage(ctx, request);
case RequestCode.GET_BROKER_CLUSTER_ACL_CONFIG:
return getBrokerClusterAclConfig(ctx, request);
default:
break;
}
return null;
}
核心配置持久化和备份
将配置相关的功能抽象出来一个类ConfigManager,提供编码、解码、加载配置和备份文件和持久化配置的功能。
package org.apache.rocketmq.common;
import java.io.IOException;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
public abstract class ConfigManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
public abstract String encode();
// 加载配置
public boolean load() {
String fileName = null;
try {
fileName = this.configFilePath();
String jsonString = MixAll.file2String(fileName);
if (null == jsonString || jsonString.length() == 0) {
return this.loadBak();
} else {
this.decode(jsonString);
log.info("load " + fileName + " OK");
return true;
}
} catch (Exception e) {
log.error("load " + fileName + " failed, and try to load backup file", e);
return this.loadBak();
}
}
public abstract String configFilePath();
// 加载备份
private boolean loadBak() {
String fileName = null;
try {
fileName = this.configFilePath();
String jsonString = MixAll.file2String(fileName + ".bak");
if (jsonString != null && jsonString.length() > 0) {
this.decode(jsonString);
log.info("load " + fileName + " OK");
return true;
}
} catch (Exception e) {
log.error("load " + fileName + " Failed", e);
return false;
}
return true;
}
public abstract void decode(final String jsonString);
// 持久化
public synchronized void persist() {
String jsonString = this.encode(true);
if (jsonString != null) {
String fileName = this.configFilePath();
try {
MixAll.string2File(jsonString, fileName);
} catch (IOException e) {
log.error("persist file " + fileName + " exception", e);
}
}
}
public abstract String encode(final boolean prettyFormat);
}
由子类去实现configFilePath()方法,因为不同的模块存在的配置不相同,可以分别建档。
有效加锁
在高并发场景中,共享的对象会有线程安全的问题,如果不加锁,那么会出现数据不一致等问题。
举个栗子,读写锁的运用, 在注册Broker时用到了写锁。
在registBroker时,使用try{}finally{} 加写锁和释放写锁, try{}finally{} 要放在try{}catch(Exception e){}里,保证操作的原子性。
public RegisterBrokerResult registerBroker(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final Channel channel) {
RegisterBrokerResult result = new RegisterBrokerResult();
try {
try {
this.lock.writeLock().lockInterruptibly();
// 将broker信息写入到table中
}finaly{
this.lock.writeLock().unlock();
}
} catch (Exception e) {
log.error("registerBroker Exception", e);
}
利用勾子函数追踪重要的方法
使用勾子函数,能够在执行方法的前后添加自定义逻辑,方法前可以用来初始化一些参数,在方法后可以关闭一些资源,可以通过勾子函数去追踪有效消息。
例如broker模块的一些勾子函数,ConsumeMesageHook和SendMessageHook
package org.apache.rocketmq.broker.mqtrace;
public interface ConsumeMessageHook {
String hookName();
void consumeMessageBefore(final ConsumeMessageContext context);
void consumeMessageAfter(final ConsumeMessageContext context);
}
使用registerConsumeMessageHook() 方法将自定义的勾子函数加到pullMessageProcessor里即可。
@Test
public void testProcessRequest_FoundWithHook() throws RemotingCommandException {
GetMessageResult getMessageResult = createGetMessageResult();
when(messageStore.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any(ExpressionMessageFilter.class))).thenReturn(getMessageResult);
List<ConsumeMessageHook> consumeMessageHookList = new ArrayList<>();
final ConsumeMessageContext[] messageContext = new ConsumeMessageContext[1];
ConsumeMessageHook consumeMessageHook = new ConsumeMessageHook() {
@Override
public String hookName() {
return "TestHook";
}
@Override
public void consumeMessageBefore(ConsumeMessageContext context) {
System.out.println("消费消息前");
messageContext[0] = context;
}
// 为什么没用到consumeMessageAfter
@Override
public void consumeMessageAfter(ConsumeMessageContext context) {
System.out.println("消费消息完毕!");
}
};
consumeMessageHookList.add(consumeMessageHook);
pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
final RemotingCommand request = createPullMsgCommand(RequestCode.PULL_MESSAGE);
RemotingCommand response = pullMessageProcessor.processRequest(handlerContext, request);
assertThat(response).isNotNull();
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
assertThat(messageContext[0]).isNotNull();
assertThat(messageContext[0].getConsumerGroup()).isEqualTo(group);
assertThat(messageContext[0].getTopic()).isEqualTo(topic);
assertThat(messageContext[0].getQueueId()).isEqualTo(1);
}
模块内能有效地进行单元测试
可以利用Junit测试工具,对模块的核心方法进行测试,用@Before做初始化动作,用@After做测试收尾工作,比如关掉线程池等。
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.client;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import java.lang.reflect.Field;
import java.util.Map;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class ProducerManagerTest {
private ProducerManager producerManager;
private String group = "FooBar";
private ClientChannelInfo clientInfo;
@Mock
private Channel channel;
@Before
public void init() {
producerManager = new ProducerManager();
clientInfo = new ClientChannelInfo(channel, "clientId", LanguageCode.JAVA, 0);
}
@Test
public void scanNotActiveChannel() throws Exception {
producerManager.registerProducer(group, clientInfo);
assertThat(producerManager.getGroupChannelTable().get(group).get(channel)).isNotNull();
assertThat(producerManager.findChannel("clientId")).isNotNull();
Field field = ProducerManager.class.getDeclaredField("CHANNEL_EXPIRED_TIMEOUT");
field.setAccessible(true);
long CHANNEL_EXPIRED_TIMEOUT = field.getLong(producerManager);
clientInfo.setLastUpdateTimestamp(System.currentTimeMillis() - CHANNEL_EXPIRED_TIMEOUT - 10);
when(channel.close()).thenReturn(mock(ChannelFuture.class));
producerManager.scanNotActiveChannel();
assertThat(producerManager.getGroupChannelTable().get(group).get(channel)).isNull();
assertThat(producerManager.findChannel("clientId")).isNull();
}
@Test
public void doChannelCloseEvent() throws Exception {
producerManager.registerProducer(group, clientInfo);
assertThat(producerManager.getGroupChannelTable().get(group).get(channel)).isNotNull();
assertThat(producerManager.findChannel("clientId")).isNotNull();
producerManager.doChannelCloseEvent("127.0.0.1", channel);
assertThat(producerManager.getGroupChannelTable().get(group).get(channel)).isNull();
assertThat(producerManager.findChannel("clientId")).isNull();
}
@Test
public void testRegisterProducer() throws Exception {
producerManager.registerProducer(group, clientInfo);
Map<Channel, ClientChannelInfo> channelMap = producerManager.getGroupChannelTable().get(group);
Channel channel1 = producerManager.findChannel("clientId");
assertThat(channelMap).isNotNull();
assertThat(channel1).isNotNull();
assertThat(channelMap.get(channel)).isEqualTo(clientInfo);
assertThat(channel1).isEqualTo(channel);
}
@Test
public void unregisterProducer() throws Exception {
producerManager.registerProducer(group, clientInfo);
Map<Channel, ClientChannelInfo> channelMap = producerManager.getGroupChannelTable().get(group);
assertThat(channelMap).isNotNull();
assertThat(channelMap.get(channel)).isEqualTo(clientInfo);
Channel channel1 = producerManager.findChannel("clientId");
assertThat(channel1).isNotNull();
assertThat(channel1).isEqualTo(channel);
producerManager.unregisterProducer(group, clientInfo);
channelMap = producerManager.getGroupChannelTable().get(group);
channel1 = producerManager.findChannel("clientId");
assertThat(channelMap).isNull();
assertThat(channel1).isNull();
}
@Test
public void testGetGroupChannelTable() throws Exception {
producerManager.registerProducer(group, clientInfo);
Map<Channel, ClientChannelInfo> oldMap = producerManager.getGroupChannelTable().get(group);
producerManager.unregisterProducer(group, clientInfo);
assertThat(oldMap.size()).isEqualTo(0);
}
@Test
public void testGetAvailableChannel() {
producerManager.registerProducer(group, clientInfo);
when(channel.isActive()).thenReturn(true);
when(channel.isWritable()).thenReturn(true);
Channel c = producerManager.getAvailableChannel(group);
assertThat(c).isSameAs(channel);
when(channel.isWritable()).thenReturn(false);
c = producerManager.getAvailableChannel(group);
assertThat(c).isSameAs(channel);
when(channel.isActive()).thenReturn(false);
c = producerManager.getAvailableChannel(group);
assertThat(c).isNull();
}
}
以上是关于RocketMQ源码系列(二十) 设计思想篇的主要内容,如果未能解决你的问题,请参考以下文章
RocketMQ源码系列 消息store存储设计核心原理解析
RocketMQ源码系列 CommitLog 存取消息源码解析
深度挖掘RocketMQ底层源码「底层系列」深度挖掘RocketMQ底层导致消息丢失透析(Broker Busy和ToManyRequest)