canal 源码解析系列-EventParser模块解析2
Posted 犀牛饲养员
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了canal 源码解析系列-EventParser模块解析2相关的知识,希望对你有一定的参考价值。
继续接着第一篇的内容讲。本篇主要看详细看看dump的过程。前面的章节我们看到com.alibaba.otter.canal.parse.inbound.AbstractEventParser#start
方法里把dump分为两种类型,并行和串行,如下:
// 4. 开始dump数据
// parallel默认是true,当然,是可以配置的。"canal.instance.parser.parallel"
//如果是并行解析,如果构建一个多阶段的协调处理器MultiStageCoprocessor
if (parallel) {
// build stage processor
multiStageCoprocessor = buildMultiStageCoprocessor();
...
iprocessor.start();
erosaConnection.dump(startPosition.getJournalName(),
startPosition.getPosition(),
multiStageCoprocessor);
...
}
} else {
//非并行情况下,直接使用sinkHandler处理数据,相当于只有一条串行的流,
...
erosaConnection.dump(startPosition.getJournalName(),
startPosition.getPosition(),
sinkHandler);
...
}
}
}
先来看看简单的部分,串行处理逻辑。
/**
*
* @param binlogfilename binlog文件名
* @param binlogPosition 文件中的偏移量
* @param func 每解析出一条binlog日志的处理函数
* @throws IOException
*/
public void dump(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException {
updateSettings();//在发送dump之前先设置相关的参数。
loadBinlogChecksum();//获取主库checksum信息,命令:select @@global.binlog_checksum
//向mysql Master 注册从节点,告知客户端的host、port、用户名与密码、serverId,具体实现是发送命令CODE为 0x15。
sendRegisterSlave();
sendBinlogDump(binlogfilename, binlogPosition);//向 MySQL Master 发送 dump 请求
//构建 DirectLogFetcher对象,实现基于 socket 的日志拉取服务
DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());
fetcher.start(connector.getChannel());//这个就是socket channel
//LogDecoder 对象,用于解析 binlog 日志
LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
LogContext context = new LogContext();//保存上下文信息
context.setFormatDescription(new FormatDescriptionLogEvent(4, binlogChecksum));
/**
* 循环拉取消息,
* 通过LogDecoder对二进制流进行解析,提取一条完整的binlog事件,交给 SinkFunction 去处理
* 可以看出这是一个串行的处理
*/
while (fetcher.fetch()) {
accumulateReceivedBytes(fetcher.limit());
LogEvent event = null;
event = decoder.decode(fetcher, context);//解析成logevent
if (event == null) {
throw new CanalParseException("parse failed");
}
//sink里面的逻辑在com.alibaba.otter.canal.parse.inbound.AbstractEventParser.start方法里
if (!func.sink(event)) {
break;
}
//如果开启了半同步机制,需要向master发送ACK
if (event.getSemival() == 1) {
sendSemiAck(context.getLogPosition().getFileName(), context.getLogPosition().getPosition());
}
}
}
这段代码比较简单,另外我的注释写的也比较详细,这里不做过多解释了。
再继续看看并行是怎么处理的,
@Override
public void dump(String binlogfilename, Long binlogPosition, MultiStageCoprocessor coprocessor) throws IOException {
updateSettings();//在发送dump之前先设置相关的参数。
loadBinlogChecksum();//获取主库checksum信息,命令:select @@global.binlog_checksum
sendRegisterSlave();//向MySQL Master 注册从节点,告知客户端的host、port、用户名与密码、serverId,具体实现是发送命令CODE为 0x15。
sendBinlogDump(binlogfilename, binlogPosition);//向 MySQL Master 发送 dump 请求
((MysqlMultiStageCoprocessor) coprocessor).setConnection(this);
((MysqlMultiStageCoprocessor) coprocessor).setBinlogChecksum(binlogChecksum);
//基于socket拉取数据
try (DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize())) {
fetcher.start(connector.getChannel());
while (fetcher.fetch()) {//提取一条完整的binlog事件
accumulateReceivedBytes(fetcher.limit());
LogBuffer buffer = fetcher.duplicate();
fetcher.consume(fetcher.limit());
//存储在LogBuffer中的binlog数据被投递到MysqlMultiStageCoprocessor的disruptorMsgBuffer中
if (!coprocessor.publish(buffer)) {
break;
}
}
}
}
注意到,和串行相比较,最后一个入参由SinkFunction
变为了MultiStageCoprocessor
,后者正是实现高性能dump的关键所在。这里看到,最终是调用了MultiStageCoprocessor的publish方法进行dump数据,不过在这之前我们来看看它的start的方法(com.alibaba.otter.canal.parse.inbound.mysql.MysqlMultiStageCoprocessor#start):
public void start() {
super.start();
this.exception = null;
// 初始化RingBuffer,这里使用了disruptor框架,可以作为一个最佳实践来学习
// 关于disruptor框架,自行搜索学习
//disruptorMsgBuffer可以理解为一个无锁队列
this.disruptorMsgBuffer = RingBuffer.createSingleProducer(new MessageEventFactory(),
ringBufferSize,
new BlockingWaitStrategy());
//多线程解析的并发数
int tc = parserThreadCount > 0 ? parserThreadCount : 1;
this.parserExecutor = Executors.newFixedThreadPool(tc, new NamedThreadFactory("MultiStageCoprocessor-Parser-"
+ destination));
this.stageExecutor = Executors.newFixedThreadPool(2, new NamedThreadFactory("MultiStageCoprocessor-other-"
+ destination));
SequenceBarrier sequenceBarrier = disruptorMsgBuffer.newBarrier();
ExceptionHandler exceptionHandler = new SimpleFatalExceptionHandler();
// stage 2
//阶段1 事件基本解析 (单线程,事件类型、DDL解析构造TableMeta、维护位点信息),调用SimpleParserStage.onEvent处理
this.logContext = new LogContext();
//BatchEventProcessor 是disruptor框架里的概念,多线程并发执行,不同线程执行不同的event
simpleParserStage = new BatchEventProcessor<>(disruptorMsgBuffer,
sequenceBarrier,
new SimpleParserStage(logContext));
simpleParserStage.setExceptionHandler(exceptionHandler);
disruptorMsgBuffer.addGatingSequences(simpleParserStage.getSequence());
// stage 3
// 阶段2 事件深度解析 (多线程, DML事件数据的完整解析) 调用DmlParserStage.onEvent处理
SequenceBarrier dmlParserSequenceBarrier = disruptorMsgBuffer.newBarrier(simpleParserStage.getSequence());
WorkHandler<MessageEvent>[] workHandlers = new DmlParserStage[tc];
for (int i = 0; i < tc; i++) {
workHandlers[i] = new DmlParserStage();
}
workerPool = new WorkerPool<MessageEvent>(disruptorMsgBuffer,
dmlParserSequenceBarrier,
exceptionHandler,
workHandlers);
Sequence[] sequence = workerPool.getWorkerSequences();
disruptorMsgBuffer.addGatingSequences(sequence);
// stage 4
// 阶段3 投递到store (单线程),调用SinkStoreStage处理
SequenceBarrier sinkSequenceBarrier = disruptorMsgBuffer.newBarrier(sequence);
sinkStoreStage = new BatchEventProcessor<>(disruptorMsgBuffer, sinkSequenceBarrier, new SinkStoreStage());
sinkStoreStage.setExceptionHandler(exceptionHandler);
disruptorMsgBuffer.addGatingSequences(sinkStoreStage.getSequence());
// start work
stageExecutor.submit(simpleParserStage);//提交任务(单线程)
stageExecutor.submit(sinkStoreStage);//提交任务(单线程)
workerPool.start(parserExecutor);
}
Canal在并行解析binlog的实现中使用了Disruptor
,Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题。这里不打算详细讲Disruptor了,感兴趣推荐一篇我认为讲得比较好的文章:
如果要完全理解canal并行解析这部分,是需要熟悉Disruptor怎么使用的。
结合注释可以看出,start方法基本就是用来初始化Disruptor需要用到的各种组件的。比如disruptorMsgBuffer
就是一个生产者的RingBuffer,创建SequenceBarrier
,用于平衡生产者和消费者速率,另外还有消息处理器BatchEventProcessor
等。
再继续来看看publish方法,这个方法表示Disruptor进行生产消息。消息的来源就是解析的binlog事件消息。
//投递数据
private boolean publish(LogBuffer buffer, LogEvent event) {
...
boolean interupted = false;
long blockingStart = 0L;
int fullTimes = 0;
do {
/**
* 由于改为processor仅终止自身stage而不是stop,那么需要由incident标识coprocessor是否正常工作。
* 让dump线程能够及时感知
*/
if (exception != null) {
throw exception;
}
try {
long next = disruptorMsgBuffer.tryNext();//取队列下一个可用的位置,不会block
MessageEvent data = disruptorMsgBuffer.get(next);//拿数据
if (buffer != null) {
data.setBuffer(buffer);
} else {
data.setEvent(event);
}
disruptorMsgBuffer.publish(next);//调用RingBuffer的publish生产消息
if (fullTimes > 0) {
eventsPublishBlockingTime.addAndGet(System.nanoTime() - blockingStart);
}
break;
} catch (InsufficientCapacityException e) {
//判断每次申请的空间是否已经被其他生产者占据。假如已经被占据,会抛出InsufficientCapacityException
if (fullTimes == 0) {
blockingStart = System.nanoTime();
}
// park
// LockSupport.parkNanos(1L);
`applyWait`(++fullTimes);
interupted = Thread.interrupted();
if (fullTimes % 1000 == 0) {
long nextStart = System.nanoTime();
eventsPublishBlockingTime.addAndGet(nextStart - blockingStart);
blockingStart = nextStart;
}
}
} while (!interupted && isStart());
return isStart();
}
applyWait
是防止cpu空转的:
// 处理无数据的情况,避免空循环挂死
private void applyWait(int fullTimes) {
int newFullTimes = fullTimes > maxFullTimes ? maxFullTimes : fullTimes;
if (fullTimes <= 3) { // 3次以内
//它让掉当前线程 CPU 的时间片,使正在运行中的线程重新变成就绪状态,并重新竞争 CPU 的调度权。它可能会获取到,也有可能被其他线程获取到。
Thread.yield();
} else { // 超过3次,最多只sleep 1ms
LockSupport.parkNanos(100 * 1000L * newFullTimes);
}
}
生产消息(binlog事件消息)是在publish方法,那么消费消息的逻辑在哪里呢?消费的逻辑就是Disruptor里的EventHandler,也就是我们前面看到的SimpleParserStage
,DmlParserStage
和sinkStoreStage
,这三个是按照顺序依次进行处理,分别完成的工作是基本解析、深度解析和投递数据到store。
关于sink和store的具体过程和源码解析,我打算放到下一篇文章来讲。
总结
EventParser 的主要职责将自己伪装成 MySQL 服务器的一个从节点,从服务器端接收 binlog日志,解析,然后存储到下游的输出端(kafka等)。这其中有些技术细节值得我们深入探讨或者学习,比如实现了基于GTID、日志位点偏移量两种方式定位 binlog 日志方式,比如引入了disruptor 框架来支持高性能的解析-存储过程等。
以上是关于canal 源码解析系列-EventParser模块解析2的主要内容,如果未能解决你的问题,请参考以下文章
canal 源码解析系列-CanalInstance模块解析