canal 源码解析系列-EventParser模块解析2

Posted 犀牛饲养员

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了canal 源码解析系列-EventParser模块解析2相关的知识,希望对你有一定的参考价值。

继续接着第一篇的内容讲。本篇主要看详细看看dump的过程。前面的章节我们看到com.alibaba.otter.canal.parse.inbound.AbstractEventParser#start方法里把dump分为两种类型,并行和串行,如下:

// 4. 开始dump数据
                        // parallel默认是true,当然,是可以配置的。"canal.instance.parser.parallel"
                        //如果是并行解析,如果构建一个多阶段的协调处理器MultiStageCoprocessor
                        if (parallel) {
                            // build stage processor
                            multiStageCoprocessor = buildMultiStageCoprocessor();
                            ...
                            iprocessor.start();
                                
                            erosaConnection.dump(startPosition.getJournalName(),
                                        startPosition.getPosition(),
                                        multiStageCoprocessor);
                                        ...
 
                            }
                        } else {
                            //非并行情况下,直接使用sinkHandler处理数据,相当于只有一条串行的流,
                            ...
                                
                            erosaConnection.dump(startPosition.getJournalName(),
                                        startPosition.getPosition(),
                                        sinkHandler);
                            ...
                                }
                            }
                        }

先来看看简单的部分,串行处理逻辑。

/**
     *
     * @param binlogfilename binlog文件名
     * @param binlogPosition 文件中的偏移量
     * @param func 每解析出一条binlog日志的处理函数
     * @throws IOException
     */
    public void dump(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException {
        updateSettings();//在发送dump之前先设置相关的参数。
        loadBinlogChecksum();//获取主库checksum信息,命令:select @@global.binlog_checksum
        //向mysql Master 注册从节点,告知客户端的host、port、用户名与密码、serverId,具体实现是发送命令CODE为 0x15。
        sendRegisterSlave();
        sendBinlogDump(binlogfilename, binlogPosition);//向 MySQL Master 发送 dump 请求

        //构建 DirectLogFetcher对象,实现基于 socket 的日志拉取服务
        DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());
        fetcher.start(connector.getChannel());//这个就是socket channel

        //LogDecoder 对象,用于解析 binlog 日志
        LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
        LogContext context = new LogContext();//保存上下文信息
        context.setFormatDescription(new FormatDescriptionLogEvent(4, binlogChecksum));
        /**
         * 循环拉取消息,
         * 通过LogDecoder对二进制流进行解析,提取一条完整的binlog事件,交给 SinkFunction 去处理
         * 可以看出这是一个串行的处理
         */
        while (fetcher.fetch()) {
            accumulateReceivedBytes(fetcher.limit());
            LogEvent event = null;
            event = decoder.decode(fetcher, context);//解析成logevent

            if (event == null) {
                throw new CanalParseException("parse failed");
            }

            //sink里面的逻辑在com.alibaba.otter.canal.parse.inbound.AbstractEventParser.start方法里
            if (!func.sink(event)) {
                break;
            }

            //如果开启了半同步机制,需要向master发送ACK
            if (event.getSemival() == 1) {
                sendSemiAck(context.getLogPosition().getFileName(), context.getLogPosition().getPosition());
            }
        }
    }

这段代码比较简单,另外我的注释写的也比较详细,这里不做过多解释了。

再继续看看并行是怎么处理的,

@Override
    public void dump(String binlogfilename, Long binlogPosition, MultiStageCoprocessor coprocessor) throws IOException {
        updateSettings();//在发送dump之前先设置相关的参数。
        loadBinlogChecksum();//获取主库checksum信息,命令:select @@global.binlog_checksum
        sendRegisterSlave();//向MySQL Master 注册从节点,告知客户端的host、port、用户名与密码、serverId,具体实现是发送命令CODE为 0x15。
        sendBinlogDump(binlogfilename, binlogPosition);//向 MySQL Master 发送 dump 请求
        ((MysqlMultiStageCoprocessor) coprocessor).setConnection(this);
        ((MysqlMultiStageCoprocessor) coprocessor).setBinlogChecksum(binlogChecksum);
        //基于socket拉取数据
        try (DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize())) {
            fetcher.start(connector.getChannel());
            while (fetcher.fetch()) {//提取一条完整的binlog事件
                accumulateReceivedBytes(fetcher.limit());
                LogBuffer buffer = fetcher.duplicate();
                fetcher.consume(fetcher.limit());
                //存储在LogBuffer中的binlog数据被投递到MysqlMultiStageCoprocessor的disruptorMsgBuffer中
                if (!coprocessor.publish(buffer)) {
                    break;
                }
            }
        }
    }

注意到,和串行相比较,最后一个入参由SinkFunction变为了MultiStageCoprocessor,后者正是实现高性能dump的关键所在。这里看到,最终是调用了MultiStageCoprocessor的publish方法进行dump数据,不过在这之前我们来看看它的start的方法(com.alibaba.otter.canal.parse.inbound.mysql.MysqlMultiStageCoprocessor#start):

public void start() {
        super.start();
        this.exception = null;
        // 初始化RingBuffer,这里使用了disruptor框架,可以作为一个最佳实践来学习
        // 关于disruptor框架,自行搜索学习
        //disruptorMsgBuffer可以理解为一个无锁队列
        this.disruptorMsgBuffer = RingBuffer.createSingleProducer(new MessageEventFactory(),
            ringBufferSize,
            new BlockingWaitStrategy());
        //多线程解析的并发数
        int tc = parserThreadCount > 0 ? parserThreadCount : 1;
        this.parserExecutor = Executors.newFixedThreadPool(tc, new NamedThreadFactory("MultiStageCoprocessor-Parser-"
                                                                                      + destination));

        this.stageExecutor = Executors.newFixedThreadPool(2, new NamedThreadFactory("MultiStageCoprocessor-other-"
                                                                                    + destination));
        SequenceBarrier sequenceBarrier = disruptorMsgBuffer.newBarrier();
        ExceptionHandler exceptionHandler = new SimpleFatalExceptionHandler();
        // stage 2
        //阶段1 事件基本解析 (单线程,事件类型、DDL解析构造TableMeta、维护位点信息),调用SimpleParserStage.onEvent处理
        this.logContext = new LogContext();
        //BatchEventProcessor 是disruptor框架里的概念,多线程并发执行,不同线程执行不同的event
        simpleParserStage = new BatchEventProcessor<>(disruptorMsgBuffer,
            sequenceBarrier,
            new SimpleParserStage(logContext));
        simpleParserStage.setExceptionHandler(exceptionHandler);
        disruptorMsgBuffer.addGatingSequences(simpleParserStage.getSequence());

        // stage 3
        // 阶段2 事件深度解析 (多线程, DML事件数据的完整解析) 调用DmlParserStage.onEvent处理
        SequenceBarrier dmlParserSequenceBarrier = disruptorMsgBuffer.newBarrier(simpleParserStage.getSequence());
        WorkHandler<MessageEvent>[] workHandlers = new DmlParserStage[tc];
        for (int i = 0; i < tc; i++) {
            workHandlers[i] = new DmlParserStage();
        }
        workerPool = new WorkerPool<MessageEvent>(disruptorMsgBuffer,
            dmlParserSequenceBarrier,
            exceptionHandler,
            workHandlers);
        Sequence[] sequence = workerPool.getWorkerSequences();
        disruptorMsgBuffer.addGatingSequences(sequence);

        // stage 4
        // 阶段3 投递到store (单线程),调用SinkStoreStage处理
        SequenceBarrier sinkSequenceBarrier = disruptorMsgBuffer.newBarrier(sequence);
        sinkStoreStage = new BatchEventProcessor<>(disruptorMsgBuffer, sinkSequenceBarrier, new SinkStoreStage());
        sinkStoreStage.setExceptionHandler(exceptionHandler);
        disruptorMsgBuffer.addGatingSequences(sinkStoreStage.getSequence());

        // start work
        stageExecutor.submit(simpleParserStage);//提交任务(单线程)
        stageExecutor.submit(sinkStoreStage);//提交任务(单线程)
        workerPool.start(parserExecutor);
    }

Canal在并行解析binlog的实现中使用了Disruptor,Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题。这里不打算详细讲Disruptor了,感兴趣推荐一篇我认为讲得比较好的文章:

高性能队列——Disruptor

如果要完全理解canal并行解析这部分,是需要熟悉Disruptor怎么使用的。

结合注释可以看出,start方法基本就是用来初始化Disruptor需要用到的各种组件的。比如disruptorMsgBuffer就是一个生产者的RingBuffer,创建SequenceBarrier ,用于平衡生产者和消费者速率,另外还有消息处理器BatchEventProcessor等。

再继续来看看publish方法,这个方法表示Disruptor进行生产消息。消息的来源就是解析的binlog事件消息。

//投递数据
    private boolean publish(LogBuffer buffer, LogEvent event) {
       ...
        boolean interupted = false;
        long blockingStart = 0L;
        int fullTimes = 0;
        do {
            /**
             * 由于改为processor仅终止自身stage而不是stop,那么需要由incident标识coprocessor是否正常工作。
             * 让dump线程能够及时感知
             */
            if (exception != null) {
                throw exception;
            }
            try {
                long next = disruptorMsgBuffer.tryNext();//取队列下一个可用的位置,不会block
                MessageEvent data = disruptorMsgBuffer.get(next);//拿数据
                if (buffer != null) {
                    data.setBuffer(buffer);
                } else {
                    data.setEvent(event);
                }
                disruptorMsgBuffer.publish(next);//调用RingBuffer的publish生产消息
                if (fullTimes > 0) {
                    eventsPublishBlockingTime.addAndGet(System.nanoTime() - blockingStart);
                }
                break;
            } catch (InsufficientCapacityException e) {
                //判断每次申请的空间是否已经被其他生产者占据。假如已经被占据,会抛出InsufficientCapacityException
                if (fullTimes == 0) {
                    blockingStart = System.nanoTime();
                }
                // park
                // LockSupport.parkNanos(1L);
                
                
                `applyWait`(++fullTimes);
                interupted = Thread.interrupted();
                if (fullTimes % 1000 == 0) {
                    long nextStart = System.nanoTime();
                    eventsPublishBlockingTime.addAndGet(nextStart - blockingStart);
                    blockingStart = nextStart;
                }
            }
        } while (!interupted && isStart());
        return isStart();
    }

applyWait是防止cpu空转的:

// 处理无数据的情况,避免空循环挂死
    private void applyWait(int fullTimes) {
        int newFullTimes = fullTimes > maxFullTimes ? maxFullTimes : fullTimes;
        if (fullTimes <= 3) { // 3次以内
            //它让掉当前线程 CPU 的时间片,使正在运行中的线程重新变成就绪状态,并重新竞争 CPU 的调度权。它可能会获取到,也有可能被其他线程获取到。
            Thread.yield();
        } else { // 超过3次,最多只sleep 1ms
            LockSupport.parkNanos(100 * 1000L * newFullTimes);
        }

    }

生产消息(binlog事件消息)是在publish方法,那么消费消息的逻辑在哪里呢?消费的逻辑就是Disruptor里的EventHandler,也就是我们前面看到的SimpleParserStage,DmlParserStagesinkStoreStage,这三个是按照顺序依次进行处理,分别完成的工作是基本解析、深度解析和投递数据到store。

关于sink和store的具体过程和源码解析,我打算放到下一篇文章来讲。

总结

EventParser 的主要职责将自己伪装成 MySQL 服务器的一个从节点,从服务器端接收 binlog日志,解析,然后存储到下游的输出端(kafka等)。这其中有些技术细节值得我们深入探讨或者学习,比如实现了基于GTID、日志位点偏移量两种方式定位 binlog 日志方式,比如引入了disruptor 框架来支持高性能的解析-存储过程等。

以上是关于canal 源码解析系列-EventParser模块解析2的主要内容,如果未能解决你的问题,请参考以下文章

canal 源码解析系列-CanalInstance模块解析

canal 源码解析系列-canal的HA机制解析

canal 源码解析系列-sink模块解析

canal 源码解析系列-工程结构说明

canal 源码解析系列-CanalServerWithEmbedded解读

canal 源码解析系列-canal的通信数据结构