canal 源码解析系列-CanalInstance模块解析
Posted 犀牛饲养员
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了canal 源码解析系列-CanalInstance模块解析相关的知识,希望对你有一定的参考价值。
下面涉及到源码的地方,我都经过了处理,删减了一些不重要的代码(比如参数校验),便于理解
正文
上一篇文章
canal 源码解析系列-CanalServerWithEmbedded解读
提到了CanalServerWithEmbedded
内部管理所有的CanalInstance
,通过 Client 的信息(destination),找到 Client 订阅的 CanalInstance,然后调用 CanalInstance 内部的各个模块进行处理。
本篇就来深入解读下CanalInstance模块。先看幅图,
instance代表了一个实际运行的数据队列,包括了EventPaser,EventSink,EventStore等组件。每个组件后面我们会有单独的文章专门分析。
从这幅图我们可以看出instance是怎么生成的。
CanalInstanceGenerator
相当于一个工厂类,通过 destination 产生特定的 CanalInstance,它有两个实现:
ManagerCanalInstanceGenerator
类,manager方式: 和你自己的内部web console/manager系统进行对接。(目前主要是公司内部使用)SpringCanalInstanceGenerator
类,spring方式:基于spring xml + properties进行定义,构建spring配置.
具体使用哪个,是通过配置的,如下所示:
canal.properties文件
canal.instance.global.mode = spring
com.alibaba.otter.canal.deployer.CanalController#initGlobalConfig方法,
...
if (config.getMode().isManager()) {
PlainCanalInstanceGenerator instanceGenerator = new PlainCanalInstanceGenerator(properties);
instanceGenerator.setCanalConfigClient(managerClients.get(config.getManagerAddress()));
instanceGenerator.setSpringXml(config.getSpringXml());
return instanceGenerator.generate(destination);
} else if (config.getMode().isSpring()) {
SpringCanalInstanceGenerator instanceGenerator = new SpringCanalInstanceGenerator();
instanceGenerator.setSpringXml(config.getSpringXml());
return instanceGenerator.generate(destination);
} else {
throw new UnsupportedOperationException("unknow mode :" + config.getMode());
}
...
先来看下spring的版本实现,
public CanalInstance generate(String destination) {
synchronized (CanalEventParser.class) {
try {
// 设置当前正在加载的通道,加载spring查找文件时会用到该变量
System.setProperty("canal.instance.destination", destination);
//file-instance.xml文件,里面定义了一些bean
this.beanFactory = getBeanFactory(springXml);
String beanName = destination;
if (!beanFactory.containsBean(beanName)) {
beanName = defaultName;
}
...
}
}
canal提供了几种spring配置文件的模版给我们选择,如下图所示:
- spring/memory-instance.xml
- spring/file-instance.xml
- spring/default-instance.xml
- spring/group-instance.xml
然后部署的时候,我们可以通过在canal.properties
配置文件中指定使用哪个文件:
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
这几个文件的主要区别是,metaManager 和eventParser 这两个配置有所不同,可能在内存、文件或zk进行存储。
spring的配置文件示例如下:
<bean id="instance" class="com.alibaba.otter.canal.instance.spring.CanalInstanceWithSpring">
<property name="destination" value="${canal.instance.destination}" />
<property name="eventParser">
<ref bean="eventParser" />
</property>
<property name="eventSink">
<ref bean="eventSink" />
</property>
<property name="eventStore">
<ref bean="eventStore" />
</property>
<property name="metaManager">
<ref bean="metaManager" />
</property>
<property name="alarmHandler">
<ref bean="alarmHandler" />
</property>
</bean>
generate
方法返回的是CanalInstanceWithSpring
这个实现类,它继承自AbstractCanalInstance
,并且实现了CanalInstance
。这个类的实现只有几十行,之所以这么少是因为大部分的逻辑都已经通过spring的配置文件实现了,如下:
<!-- 报警处理类 -->
<bean id="alarmHandler" class="com.alibaba.otter.canal.common.alarm.LogAlarmHandler" />
<bean id="metaManager" class="com.alibaba.otter.canal.meta.FileMixedMetaManager">
<property name="dataDir" value="${canal.file.data.dir:../conf}" />
<property name="period" value="${canal.file.flush.period:1000}" />
</bean>
<bean id="eventStore" class="com.alibaba.otter.canal.store.memory.MemoryEventStoreWithBuffer">
<property name="bufferSize" value="${canal.instance.memory.buffer.size:16384}" />
<property name="bufferMemUnit" value="${canal.instance.memory.buffer.memunit:1024}" />
<property name="batchMode" value="${canal.instance.memory.batch.mode:MEMSIZE}" />
</bean>
<bean id="eventSink" class="com.alibaba.otter.canal.sink.entry.EntryEventSink">
<property name="eventStore" ref="eventStore" />
</bean>
...
ManagerCanalInstanceGenerator
实现类似,这里就不多说了。它返回的是CanalInstanceWithManager
,同样它继承自AbstractCanalInstance
,并且实现了CanalInstance
。类图如下:
核心的功能都在AbstractCanalInstance类中,我们来看下。
// 通知下订阅关系变化
@Override
public boolean subscribeChange(ClientIdentity identity) {
if (StringUtils.isNotEmpty(identity.getFilter())) {
logger.info("subscribe filter change to " + identity.getFilter());
//订阅关系发生变化触发,主要是更新filter,这个filter决定了binlog订阅的库,表
AviaterRegexFilter aviaterFilter = new AviaterRegexFilter(identity.getFilter());
boolean isGroup = (eventParser instanceof GroupEventParser);
if (isGroup) {
// 处理group的模式
List<CanalEventParser> eventParsers = ((GroupEventParser) eventParser).getEventParsers();
for (CanalEventParser singleEventParser : eventParsers) {// 需要遍历启动
if(singleEventParser instanceof AbstractEventParser) {
((AbstractEventParser) singleEventParser).setEventFilter(aviaterFilter);
}
}
} else {
if(eventParser instanceof AbstractEventParser) {
((AbstractEventParser) eventParser).setEventFilter(aviaterFilter);
}
}
}
// filter的处理规则
// a. parser处理数据过滤处理
// b. sink处理数据的路由&分发,一份parse数据经过sink后可以分发为多份,每份的数据可以根据自己的过滤规则不同而有不同的数据
// 后续内存版的一对多分发,可以考虑
return true;
}
start方法和stop方法没什么可讲的,就是启停instance内部的组件。
beforeStartEventParser
和afterStartEventParser
是eventParser启动的前置和后置操作。前者调用了startEventParserInternal
,后者调用了stopEventParserInternal
,
protected void startEventParserInternal(CanalEventParser eventParser, boolean isGroup) {
if (eventParser instanceof AbstractEventParser) {
...
// 首先启动log position管理器
CanalLogPositionManager logPositionManager = abstractEventParser.getLogPositionManager();
if (!logPositionManager.isStart()) {
logPositionManager.start();
}
}
...
CanalHAController haController = mysqlEventParser.getHaController();
...
haController.start();
}
}
protected void stopEventParserInternal(CanalEventParser eventParser) {
if (eventParser instanceof AbstractEventParser) {
...
// 首先启动log position管理器
CanalLogPositionManager logPositionManager = abstractEventParser.getLogPositionManager();
if (logPositionManager.isStart()) {
logPositionManager.stop();
}
}
....
CanalHAController haController = mysqlEventParser.getHaController();
if (haController.isStart()) {
haController.stop();
}
}
就是分别负责了CanalLogPositionManager
和CanalHAController
的启动停止工作。
CanalLogPositionManager记录binlog最后一次解析成功位置,有不同的实现,可以保存在内存,zk等存在介质里。mysql在主从同步过程中,slave自己需要维护binlog的消费进度信息。而canal伪装成slave,因此也要维护这样的信息。
CanalHAController主要是通过失败检测, 控制 EventParser 的链接主机管理,判断当前该链接哪个mysql数据库。它只有一个实现类HeartBeatHAController
失败转换的逻辑也很简单,定时发送心跳语句到当前链接的数据库,超过一定次数检测失败时,尝试切换到备机
//心跳发送成功
public void onSuccess(long costTime) {
failedTimes = 0;
}
//心跳发送失败
public void onFailed(Throwable e) {
failedTimes++;
// 检查一下是否超过失败次数
synchronized (this) {
if (failedTimes > detectingRetryTimes) {
if (switchEnable) {
eventParser.doSwitch();// 通知执行一次切换
failedTimes = 0;
} else {
logger.warn("HeartBeat failed Times:{} , should auto switch ?", failedTimes);
}
}
}
}
心跳的逻辑在 com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser.MysqlDetectingTimeTask 实现,是个定时器。
class MysqlDetectingTimeTask extends TimerTask {
...
public void run() {
try {
...
// 可能心跳sql为select 1
if (StringUtils.startsWithIgnoreCase(detectingSQL.trim(), "select")
|| StringUtils.startsWithIgnoreCase(detectingSQL.trim(), "show")
|| StringUtils.startsWithIgnoreCase(detectingSQL.trim(), "explain")
|| StringUtils.startsWithIgnoreCase(detectingSQL.trim(), "desc")) {
mysqlConnection.query(detectingSQL);
} else {
mysqlConnection.update(detectingSQL);
}
long costTime = System.currentTimeMillis() - startTime;
if (haController != null && haController instanceof HeartBeatCallback) {
//成功
((HeartBeatCallback) haController).onSuccess(costTime);
}
} catch (Throwable e) {
//失败
if (haController != null && haController instanceof HeartBeatCallback) {
((HeartBeatCallback) haController).onFailed(e);
}
reconnect = true;
logger.warn("connect failed by ", e);
}
}
...
}
总结
总体来看,CanalInstance
模块本身没有什么特别复杂的逻辑,它的核心处理都在parser、sink、store、metamanager等内部组件里。这些内部组件我们后面会有文章单独分析。
以上是关于canal 源码解析系列-CanalInstance模块解析的主要内容,如果未能解决你的问题,请参考以下文章