canal 源码解析系列-CanalInstance模块解析

Posted 犀牛饲养员

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了canal 源码解析系列-CanalInstance模块解析相关的知识,希望对你有一定的参考价值。

文章目录

下面涉及到源码的地方,我都经过了处理,删减了一些不重要的代码(比如参数校验),便于理解

正文

上一篇文章

canal 源码解析系列-CanalServerWithEmbedded解读

提到了CanalServerWithEmbedded 内部管理所有的CanalInstance,通过 Client 的信息(destination),找到 Client 订阅的 CanalInstance,然后调用 CanalInstance 内部的各个模块进行处理。

本篇就来深入解读下CanalInstance模块。先看幅图,

instance代表了一个实际运行的数据队列,包括了EventPaser,EventSink,EventStore等组件。每个组件后面我们会有单独的文章专门分析。

从这幅图我们可以看出instance是怎么生成的。

CanalInstanceGenerator相当于一个工厂类,通过 destination 产生特定的 CanalInstance,它有两个实现:

  • ManagerCanalInstanceGenerator类,manager方式: 和你自己的内部web console/manager系统进行对接。(目前主要是公司内部使用)
  • SpringCanalInstanceGenerator类,spring方式:基于spring xml + properties进行定义,构建spring配置.

具体使用哪个,是通过配置的,如下所示:

canal.properties文件

canal.instance.global.mode = spring

com.alibaba.otter.canal.deployer.CanalController#initGlobalConfig方法,

...
if (config.getMode().isManager()) {
                PlainCanalInstanceGenerator instanceGenerator = new PlainCanalInstanceGenerator(properties);
                instanceGenerator.setCanalConfigClient(managerClients.get(config.getManagerAddress()));
                instanceGenerator.setSpringXml(config.getSpringXml());
                return instanceGenerator.generate(destination);
            } else if (config.getMode().isSpring()) {
                SpringCanalInstanceGenerator instanceGenerator = new SpringCanalInstanceGenerator();
                instanceGenerator.setSpringXml(config.getSpringXml());
                return instanceGenerator.generate(destination);
            } else {
                throw new UnsupportedOperationException("unknow mode :" + config.getMode());
            }
            ...

先来看下spring的版本实现,

public CanalInstance generate(String destination) {
        synchronized (CanalEventParser.class) {
            try {
                // 设置当前正在加载的通道,加载spring查找文件时会用到该变量
                System.setProperty("canal.instance.destination", destination);
                //file-instance.xml文件,里面定义了一些bean
                this.beanFactory = getBeanFactory(springXml);
                String beanName = destination;
                if (!beanFactory.containsBean(beanName)) {
                    beanName = defaultName;
                }

            ...
        }
    }

canal提供了几种spring配置文件的模版给我们选择,如下图所示:

  • spring/memory-instance.xml
  • spring/file-instance.xml
  • spring/default-instance.xml
  • spring/group-instance.xml

然后部署的时候,我们可以通过在canal.properties配置文件中指定使用哪个文件:

canal.instance.global.spring.xml = classpath:spring/file-instance.xml

这几个文件的主要区别是,metaManager 和eventParser 这两个配置有所不同,可能在内存、文件或zk进行存储。

spring的配置文件示例如下:

<bean id="instance" class="com.alibaba.otter.canal.instance.spring.CanalInstanceWithSpring">
		<property name="destination" value="${canal.instance.destination}" />
		<property name="eventParser">
			<ref bean="eventParser" />
		</property>
		<property name="eventSink">
			<ref bean="eventSink" />
		</property>
		<property name="eventStore">
			<ref bean="eventStore" />
		</property>
		<property name="metaManager">
			<ref bean="metaManager" />
		</property>
		<property name="alarmHandler">
			<ref bean="alarmHandler" />
		</property>
	</bean>

generate方法返回的是CanalInstanceWithSpring这个实现类,它继承自AbstractCanalInstance,并且实现了CanalInstance。这个类的实现只有几十行,之所以这么少是因为大部分的逻辑都已经通过spring的配置文件实现了,如下:

<!-- 报警处理类 -->
	<bean id="alarmHandler" class="com.alibaba.otter.canal.common.alarm.LogAlarmHandler" />
	
	<bean id="metaManager" class="com.alibaba.otter.canal.meta.FileMixedMetaManager">
		<property name="dataDir" value="${canal.file.data.dir:../conf}" />
		<property name="period" value="${canal.file.flush.period:1000}" />
	</bean>
	
	<bean id="eventStore" class="com.alibaba.otter.canal.store.memory.MemoryEventStoreWithBuffer">
		<property name="bufferSize" value="${canal.instance.memory.buffer.size:16384}" />
		<property name="bufferMemUnit" value="${canal.instance.memory.buffer.memunit:1024}" />
		<property name="batchMode" value="${canal.instance.memory.batch.mode:MEMSIZE}" />
	</bean>
	
	<bean id="eventSink" class="com.alibaba.otter.canal.sink.entry.EntryEventSink">
		<property name="eventStore" ref="eventStore" />
	</bean>
	...

ManagerCanalInstanceGenerator实现类似,这里就不多说了。它返回的是CanalInstanceWithManager,同样它继承自AbstractCanalInstance,并且实现了CanalInstance。类图如下:

核心的功能都在AbstractCanalInstance类中,我们来看下。

// 通知下订阅关系变化
    @Override
    public boolean subscribeChange(ClientIdentity identity) {
        if (StringUtils.isNotEmpty(identity.getFilter())) {
            logger.info("subscribe filter change to " + identity.getFilter());
            //订阅关系发生变化触发,主要是更新filter,这个filter决定了binlog订阅的库,表
            AviaterRegexFilter aviaterFilter = new AviaterRegexFilter(identity.getFilter());

            boolean isGroup = (eventParser instanceof GroupEventParser);
            if (isGroup) {
                // 处理group的模式
                List<CanalEventParser> eventParsers = ((GroupEventParser) eventParser).getEventParsers();
                for (CanalEventParser singleEventParser : eventParsers) {// 需要遍历启动
                    if(singleEventParser instanceof AbstractEventParser) {
                        ((AbstractEventParser) singleEventParser).setEventFilter(aviaterFilter);
                    }
                }
            } else {
                if(eventParser instanceof AbstractEventParser) {
                    ((AbstractEventParser) eventParser).setEventFilter(aviaterFilter);
                }
            }

        }

        // filter的处理规则
        // a. parser处理数据过滤处理
        // b. sink处理数据的路由&分发,一份parse数据经过sink后可以分发为多份,每份的数据可以根据自己的过滤规则不同而有不同的数据
        // 后续内存版的一对多分发,可以考虑
        return true;
    }

start方法和stop方法没什么可讲的,就是启停instance内部的组件。

beforeStartEventParserafterStartEventParser是eventParser启动的前置和后置操作。前者调用了startEventParserInternal,后者调用了stopEventParserInternal

protected void startEventParserInternal(CanalEventParser eventParser, boolean isGroup) {
        if (eventParser instanceof AbstractEventParser) {
           ...
            // 首先启动log position管理器
            CanalLogPositionManager logPositionManager = abstractEventParser.getLogPositionManager();
            if (!logPositionManager.isStart()) {
                logPositionManager.start();
            }
        }

            ...
            CanalHAController haController = mysqlEventParser.getHaController();
            ...
                haController.start();

        }
    }
protected void stopEventParserInternal(CanalEventParser eventParser) {
        if (eventParser instanceof AbstractEventParser) {
            ...
            // 首先启动log position管理器
            CanalLogPositionManager logPositionManager = abstractEventParser.getLogPositionManager();
            if (logPositionManager.isStart()) {
                logPositionManager.stop();
            }
        }

            ....
            CanalHAController haController = mysqlEventParser.getHaController();
            if (haController.isStart()) {
                haController.stop();
            }
    }

就是分别负责了CanalLogPositionManagerCanalHAController的启动停止工作。

CanalLogPositionManager记录binlog最后一次解析成功位置,有不同的实现,可以保存在内存,zk等存在介质里。mysql在主从同步过程中,slave自己需要维护binlog的消费进度信息。而canal伪装成slave,因此也要维护这样的信息。

CanalHAController主要是通过失败检测, 控制 EventParser 的链接主机管理,判断当前该链接哪个mysql数据库。它只有一个实现类HeartBeatHAController

失败转换的逻辑也很简单,定时发送心跳语句到当前链接的数据库,超过一定次数检测失败时,尝试切换到备机

//心跳发送成功
    public void onSuccess(long costTime) {
        failedTimes = 0;
    }

    //心跳发送失败
    public void onFailed(Throwable e) {
        failedTimes++;
        // 检查一下是否超过失败次数
        synchronized (this) {
            if (failedTimes > detectingRetryTimes) {
                if (switchEnable) {
                    eventParser.doSwitch();// 通知执行一次切换
                    failedTimes = 0;
                } else {
                    logger.warn("HeartBeat failed Times:{} , should auto switch ?", failedTimes);
                }
            }
        }
    }

心跳的逻辑在 com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser.MysqlDetectingTimeTask 实现,是个定时器。

class MysqlDetectingTimeTask extends TimerTask {

       ...

        public void run() {
            try {
                ...

                // 可能心跳sql为select 1
                if (StringUtils.startsWithIgnoreCase(detectingSQL.trim(), "select")
                    || StringUtils.startsWithIgnoreCase(detectingSQL.trim(), "show")
                    || StringUtils.startsWithIgnoreCase(detectingSQL.trim(), "explain")
                    || StringUtils.startsWithIgnoreCase(detectingSQL.trim(), "desc")) {
                    mysqlConnection.query(detectingSQL);
                } else {
                    mysqlConnection.update(detectingSQL);
                }

                long costTime = System.currentTimeMillis() - startTime;
                if (haController != null && haController instanceof HeartBeatCallback) {
                //成功
                    ((HeartBeatCallback) haController).onSuccess(costTime);
                }
            } catch (Throwable e) {
            //失败
                if (haController != null && haController instanceof HeartBeatCallback) {
                    ((HeartBeatCallback) haController).onFailed(e);
                }
                reconnect = true;
                logger.warn("connect failed by ", e);
            }

        }

       ...
    }

总结

总体来看,CanalInstance模块本身没有什么特别复杂的逻辑,它的核心处理都在parser、sink、store、metamanager等内部组件里。这些内部组件我们后面会有文章单独分析。

以上是关于canal 源码解析系列-CanalInstance模块解析的主要内容,如果未能解决你的问题,请参考以下文章

canal 源码解析系列-sink模块解析

canal 源码解析系列-EventParser模块解析1

canal 源码解析系列-工程结构说明

canal 源码解析系列-CanalServerWithEmbedded解读

canal 源码解析系列-canal的通信数据结构

canal 源码解析系列-EventParser模块解析2