RocketMQ源码系列(二十) 设计思想篇

Posted Dream_it_possible!

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ源码系列(二十) 设计思想篇相关的知识,希望对你有一定的参考价值。

rocketmq 版本: 4.8.0

单一职责

        单一原则是架构设计中非常重要的一个特性,大到模块,小到类,尽量能做到只负责自己职责范围之内的事情,尽最大限度减少模块之间的耦合,简称高内聚、低耦合。

模块

        每个模块的职责负责的范围基本实现单一,例如日志模块 logging, logging模块只负责日志功能,对外模块提供日志服务,在类里只需要加上一行代码就能享受到日志的功能。

    private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);

        例如NameSrv模块的KVConfigManager类,该类是NameSrv的基础的配置管理器, 因此他只负责加载配置、更新配置、删除配置、根据namespace和key来获取value等。

统一封装request和response

        rocketmq的通信是基于netty实现的,他统一采用RemotingComand的实现类来封装request和response, 每个请求和响应都可以看成一个动作, 所有请求和响应都可以包装成RmotingCommand。

        例如,创建一个注册broker的请求:

RemotingCommand request =
RemotingCommand.createRequestCommand(reg ? RequestCode.REGISTER_BROKER : RequestCode.UNREGISTER_BROKER, requestHeader);

        创建一个注册broker完成后的响应:

final RemotingCommand response = 
RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);

        请求和响应的体的封装均有实现类去实现,这样做的好处是实现统一处理请求,只需要根据不同的请求Code去处理不同的业务逻辑,响应也根据code来进行响应。

        看一下有哪些responseCode

package org.apache.rocketmq.remoting.protocol;

public class RemotingSysResponseCode {

    public static final int SUCCESS = 0;

    public static final int SYSTEM_ERROR = 1;

    public static final int SYSTEM_BUSY = 2;

    public static final int REQUEST_CODE_NOT_SUPPORTED = 3;

    public static final int TRANSACTION_FAILED = 4;
}

        接收到netty的请求后,统一处理请求proceessRequest

   @Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        switch (request.getCode()) {
            case RequestCode.UPDATE_AND_CREATE_TOPIC:
                return this.updateAndCreateTopic(ctx, request);
            case RequestCode.DELETE_TOPIC_IN_BROKER:
                return this.deleteTopic(ctx, request);
            case RequestCode.GET_ALL_TOPIC_CONFIG:
                return this.getAllTopicConfig(ctx, request);
            case RequestCode.UPDATE_BROKER_CONFIG:
                return this.updateBrokerConfig(ctx, request);
            case RequestCode.GET_BROKER_CONFIG:
                return this.getBrokerConfig(ctx, request);
            case RequestCode.SEARCH_OFFSET_BY_TIMESTAMP:
                return this.searchOffsetByTimestamp(ctx, request);
            case RequestCode.GET_MAX_OFFSET:
                return this.getMaxOffset(ctx, request);
            case RequestCode.GET_MIN_OFFSET:
                return this.getMinOffset(ctx, request);
            case RequestCode.GET_EARLIEST_MSG_STORETIME:
                return this.getEarliestMsgStoretime(ctx, request);
            case RequestCode.GET_BROKER_RUNTIME_INFO:
                return this.getBrokerRuntimeInfo(ctx, request);
            case RequestCode.LOCK_BATCH_MQ:
                return this.lockBatchMQ(ctx, request);
            case RequestCode.UNLOCK_BATCH_MQ:
                return this.unlockBatchMQ(ctx, request);
            case RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP:
                return this.updateAndCreateSubscriptionGroup(ctx, request);
            case RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG:
                return this.getAllSubscriptionGroup(ctx, request);
            case RequestCode.DELETE_SUBSCRIPTIONGROUP:
                return this.deleteSubscriptionGroup(ctx, request);
            case RequestCode.GET_TOPIC_STATS_INFO:
                return this.getTopicStatsInfo(ctx, request);
            case RequestCode.GET_CONSUMER_CONNECTION_LIST:
                return this.getConsumerConnectionList(ctx, request);
            case RequestCode.GET_PRODUCER_CONNECTION_LIST:
                return this.getProducerConnectionList(ctx, request);
            case RequestCode.GET_CONSUME_STATS:
                return this.getConsumeStats(ctx, request);
            case RequestCode.GET_ALL_CONSUMER_OFFSET:
                return this.getAllConsumerOffset(ctx, request);
            case RequestCode.GET_ALL_DELAY_OFFSET:
                return this.getAllDelayOffset(ctx, request);
            case RequestCode.INVOKE_BROKER_TO_RESET_OFFSET:
                return this.resetOffset(ctx, request);
            case RequestCode.INVOKE_BROKER_TO_GET_CONSUMER_STATUS:
                return this.getConsumerStatus(ctx, request);
            case RequestCode.QUERY_TOPIC_CONSUME_BY_WHO:
                return this.queryTopicConsumeByWho(ctx, request);
            case RequestCode.REGISTER_FILTER_SERVER:
                return this.registerFilterServer(ctx, request);
            case RequestCode.QUERY_CONSUME_TIME_SPAN:
                return this.queryConsumeTimeSpan(ctx, request);
            case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_BROKER:
                return this.getSystemTopicListFromBroker(ctx, request);
            case RequestCode.CLEAN_EXPIRED_CONSUMEQUEUE:
                return this.cleanExpiredConsumeQueue();
            case RequestCode.CLEAN_UNUSED_TOPIC:
                return this.cleanUnusedTopic();
            case RequestCode.GET_CONSUMER_RUNNING_INFO:
                return this.getConsumerRunningInfo(ctx, request);
            case RequestCode.QUERY_CORRECTION_OFFSET:
                return this.queryCorrectionOffset(ctx, request);
            case RequestCode.CONSUME_MESSAGE_DIRECTLY:
                return this.consumeMessageDirectly(ctx, request);
            case RequestCode.CLONE_GROUP_OFFSET:
                return this.cloneGroupOffset(ctx, request);
            case RequestCode.VIEW_BROKER_STATS_DATA:
                return ViewBrokerStatsData(ctx, request);
            case RequestCode.GET_BROKER_CONSUME_STATS:
                return fetchAllConsumeStatsInBroker(ctx, request);
            case RequestCode.QUERY_CONSUME_QUEUE:
                return queryConsumeQueue(ctx, request);
            case RequestCode.UPDATE_AND_CREATE_ACL_CONFIG:
                return updateAndCreateAccessConfig(ctx, request);
            case RequestCode.DELETE_ACL_CONFIG:
                return deleteAccessConfig(ctx, request);
            case RequestCode.GET_BROKER_CLUSTER_ACL_INFO:
                return getBrokerAclConfigVersion(ctx, request);
            case RequestCode.UPDATE_GLOBAL_WHITE_ADDRS_CONFIG:
                return updateGlobalWhiteAddrsConfig(ctx, request);
            case RequestCode.RESUME_CHECK_HALF_MESSAGE:
                return resumeCheckHalfMessage(ctx, request);
            case RequestCode.GET_BROKER_CLUSTER_ACL_CONFIG:
                return getBrokerClusterAclConfig(ctx, request);
            default:
                break;
        }

        return null;
    }

核心配置持久化和备份

        将配置相关的功能抽象出来一个类ConfigManager,提供编码、解码、加载配置和备份文件和持久化配置的功能。

package org.apache.rocketmq.common;

import java.io.IOException;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;

public abstract class ConfigManager {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);

    public abstract String encode();

    // 加载配置
    public boolean load() {
        String fileName = null;
        try {
            fileName = this.configFilePath();
            String jsonString = MixAll.file2String(fileName);

            if (null == jsonString || jsonString.length() == 0) {
                return this.loadBak();
            } else {
                this.decode(jsonString);
                log.info("load " + fileName + " OK");
                return true;
            }
        } catch (Exception e) {
            log.error("load " + fileName + " failed, and try to load backup file", e);
            return this.loadBak();
        }
    }

    public abstract String configFilePath();
    // 加载备份
    private boolean loadBak() {
        String fileName = null;
        try {
            fileName = this.configFilePath();
            String jsonString = MixAll.file2String(fileName + ".bak");
            if (jsonString != null && jsonString.length() > 0) {
                this.decode(jsonString);
                log.info("load " + fileName + " OK");
                return true;
            }
        } catch (Exception e) {
            log.error("load " + fileName + " Failed", e);
            return false;
        }

        return true;
    }

    public abstract void decode(final String jsonString);
    
    // 持久化
    public synchronized void persist() {
        String jsonString = this.encode(true);
        if (jsonString != null) {
            String fileName = this.configFilePath();
            try {
                MixAll.string2File(jsonString, fileName);
            } catch (IOException e) {
                log.error("persist file " + fileName + " exception", e);
            }
        }
    }

    public abstract String encode(final boolean prettyFormat);
}

        由子类去实现configFilePath()方法,因为不同的模块存在的配置不相同,可以分别建档。

有效加锁

        在高并发场景中,共享的对象会有线程安全的问题,如果不加锁,那么会出现数据不一致等问题。

        举个栗子,读写锁的运用, 在注册Broker时用到了写锁。

        在registBroker时,使用try{}finally{} 加写锁和释放写锁, try{}finally{} 要放在try{}catch(Exception e){}里,保证操作的原子性。

   public RegisterBrokerResult registerBroker(
        final String clusterName,
        final String brokerAddr,
        final String brokerName,
        final long brokerId,
        final String haServerAddr,
        final TopicConfigSerializeWrapper topicConfigWrapper,
        final List<String> filterServerList,
        final Channel channel) {
        RegisterBrokerResult result = new RegisterBrokerResult();
        try {
            try {
                this.lock.writeLock().lockInterruptibly();
                 // 将broker信息写入到table中
            }finaly{
                this.lock.writeLock().unlock();
            }
        } catch (Exception e) {
            log.error("registerBroker Exception", e);
        }

利用勾子函数追踪重要的方法

        使用勾子函数,能够在执行方法的前后添加自定义逻辑,方法前可以用来初始化一些参数,在方法后可以关闭一些资源,可以通过勾子函数去追踪有效消息。

        例如broker模块的一些勾子函数,ConsumeMesageHook和SendMessageHook

package org.apache.rocketmq.broker.mqtrace;

public interface ConsumeMessageHook {
    String hookName();

    void consumeMessageBefore(final ConsumeMessageContext context);

    void consumeMessageAfter(final ConsumeMessageContext context);
}

        使用registerConsumeMessageHook() 方法将自定义的勾子函数加到pullMessageProcessor里即可。

    @Test
    public void testProcessRequest_FoundWithHook() throws RemotingCommandException {
        GetMessageResult getMessageResult = createGetMessageResult();
        when(messageStore.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any(ExpressionMessageFilter.class))).thenReturn(getMessageResult);
        List<ConsumeMessageHook> consumeMessageHookList = new ArrayList<>();
        final ConsumeMessageContext[] messageContext = new ConsumeMessageContext[1];
        ConsumeMessageHook consumeMessageHook = new ConsumeMessageHook() {
            @Override
            public String hookName() {
                return "TestHook";
            }

            @Override
            public void consumeMessageBefore(ConsumeMessageContext context) {
                System.out.println("消费消息前");
                messageContext[0] = context;
            }

            // 为什么没用到consumeMessageAfter
            @Override
            public void consumeMessageAfter(ConsumeMessageContext context) {
                System.out.println("消费消息完毕!");
            }
        };
        consumeMessageHookList.add(consumeMessageHook);
        pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
        final RemotingCommand request = createPullMsgCommand(RequestCode.PULL_MESSAGE);
        RemotingCommand response = pullMessageProcessor.processRequest(handlerContext, request);
        assertThat(response).isNotNull();
        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
        assertThat(messageContext[0]).isNotNull();
        assertThat(messageContext[0].getConsumerGroup()).isEqualTo(group);
        assertThat(messageContext[0].getTopic()).isEqualTo(topic);
        assertThat(messageContext[0].getQueueId()).isEqualTo(1);
    }

 模块内能有效地进行单元测试

        可以利用Junit测试工具,对模块的核心方法进行测试,用@Before做初始化动作,用@After做测试收尾工作,比如关掉线程池等。

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.rocketmq.broker.client;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import java.lang.reflect.Field;
import java.util.Map;

import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

@RunWith(MockitoJUnitRunner.class)
public class ProducerManagerTest {
    private ProducerManager producerManager;
    private String group = "FooBar";
    private ClientChannelInfo clientInfo;

    @Mock
    private Channel channel;

    @Before
    public void init() {
        producerManager = new ProducerManager();
        clientInfo = new ClientChannelInfo(channel, "clientId", LanguageCode.JAVA, 0);
    }

    @Test
    public void scanNotActiveChannel() throws Exception {
        producerManager.registerProducer(group, clientInfo);
        assertThat(producerManager.getGroupChannelTable().get(group).get(channel)).isNotNull();
        assertThat(producerManager.findChannel("clientId")).isNotNull();
        Field field = ProducerManager.class.getDeclaredField("CHANNEL_EXPIRED_TIMEOUT");
        field.setAccessible(true);
        long CHANNEL_EXPIRED_TIMEOUT = field.getLong(producerManager);
        clientInfo.setLastUpdateTimestamp(System.currentTimeMillis() - CHANNEL_EXPIRED_TIMEOUT - 10);
        when(channel.close()).thenReturn(mock(ChannelFuture.class));
        producerManager.scanNotActiveChannel();
        assertThat(producerManager.getGroupChannelTable().get(group).get(channel)).isNull();
        assertThat(producerManager.findChannel("clientId")).isNull();
    }

    @Test
    public void doChannelCloseEvent() throws Exception {
        producerManager.registerProducer(group, clientInfo);
        assertThat(producerManager.getGroupChannelTable().get(group).get(channel)).isNotNull();
        assertThat(producerManager.findChannel("clientId")).isNotNull();
        producerManager.doChannelCloseEvent("127.0.0.1", channel);
        assertThat(producerManager.getGroupChannelTable().get(group).get(channel)).isNull();
        assertThat(producerManager.findChannel("clientId")).isNull();
    }

    @Test
    public void testRegisterProducer() throws Exception {
        producerManager.registerProducer(group, clientInfo);
        Map<Channel, ClientChannelInfo> channelMap = producerManager.getGroupChannelTable().get(group);
        Channel channel1 = producerManager.findChannel("clientId");
        assertThat(channelMap).isNotNull();
        assertThat(channel1).isNotNull();
        assertThat(channelMap.get(channel)).isEqualTo(clientInfo);
        assertThat(channel1).isEqualTo(channel);
    }

    @Test
    public void unregisterProducer() throws Exception {
        producerManager.registerProducer(group, clientInfo);
        Map<Channel, ClientChannelInfo> channelMap = producerManager.getGroupChannelTable().get(group);
        assertThat(channelMap).isNotNull();
        assertThat(channelMap.get(channel)).isEqualTo(clientInfo);
        Channel channel1 = producerManager.findChannel("clientId");
        assertThat(channel1).isNotNull();
        assertThat(channel1).isEqualTo(channel);
        producerManager.unregisterProducer(group, clientInfo);
        channelMap = producerManager.getGroupChannelTable().get(group);
        channel1 = producerManager.findChannel("clientId");
        assertThat(channelMap).isNull();
        assertThat(channel1).isNull();

    }

    @Test
    public void testGetGroupChannelTable() throws Exception {
        producerManager.registerProducer(group, clientInfo);
        Map<Channel, ClientChannelInfo> oldMap = producerManager.getGroupChannelTable().get(group);
        
        producerManager.unregisterProducer(group, clientInfo);
        assertThat(oldMap.size()).isEqualTo(0);
    }

    @Test
    public void testGetAvailableChannel() {
        producerManager.registerProducer(group, clientInfo);

        when(channel.isActive()).thenReturn(true);
        when(channel.isWritable()).thenReturn(true);
        Channel c = producerManager.getAvailableChannel(group);
        assertThat(c).isSameAs(channel);

        when(channel.isWritable()).thenReturn(false);
        c = producerManager.getAvailableChannel(group);
        assertThat(c).isSameAs(channel);

        when(channel.isActive()).thenReturn(false);
        c = producerManager.getAvailableChannel(group);
        assertThat(c).isNull();
    }

}

以上是关于RocketMQ源码系列(二十) 设计思想篇的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ源码系列 消息store存储设计核心原理解析

面试连环炮系列(二十?四):为什么选择RocketMQ

RocketMQ源码系列 CommitLog 存取消息源码解析

深度挖掘RocketMQ底层源码「底层系列」深度挖掘RocketMQ底层导致消息丢失透析(Broker Busy和ToManyRequest)

spring cloud stream 3.1.2 源码搭配rocketmq学习

RocketMQ源码解析-NameServer篇