canal 源码解析系列-CanalServerWithEmbedded解读

Posted 犀牛饲养员

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了canal 源码解析系列-CanalServerWithEmbedded解读相关的知识,希望对你有一定的参考价值。

前面的文章简单说过这个类。canal server模块的核心接口为CanalServer,有两个实现:

  • CanalServerWithEmbedded
  • CanalServerWithNetty

这两个实现代表了canal的两种应用模式,CanalServerWithNetty在canal独立部署场景发挥作用,开发者只需要实现cient,不同的应用通过canal client与canal server进行通信,canal client的请求统一由CanalServerWithNetty接受进行处理。

而通过CanalServerWithEmbeded,可以不需要独立部署canal,而是把canal嵌入到我们自己的服务里。但是这种对开发者的要求就比较高。

下面的图表示二者的关系,

从图上可以看出,CanalServerWithNetty是client通过http访问的canal的门户,而它往下其实也是调用CanalServerWithEmbedded来完成具体的工作。所以我们就来具体说说这个CanalServerWithEmbedded

从上图还可以看出,CanalServerWithEmbedded 内部管理所有的CanalInstance,通过 Client 的信息(destination),找到 Client 订阅的 CanalInstance,然后调用 CanalInstance 内部的各个模块进行处理。关于instance后面会有专门的文章分析,这里暂且不表。

下面涉及到源码的地方,我都经过了处理,删减了一些不重要的代码(比如参数校验),便于理解

canal处理client请求的核心逻辑都在SessionHandler这个类中,这个handler在接收到客户端请求是调用CanalServerWithEmbeded对应的方法进行处理,如下所示:

    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        logger.info("message receives in session handler...");
        long start = System.nanoTime();
        ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
        Packet packet = Packet.parseFrom(buffer.readBytes(buffer.readableBytes()).array());
        ClientIdentity clientIdentity = null;
        try {
            switch (packet.getType()) {
                case SUBSCRIPTION://订阅请求
                    ...

                        embeddedServer.subscribe(clientIdentity);
                        ...
                    } else {
                        byte[] errorBytes = NettyUtils.errorPacket(401,
                            MessageFormatter.format("destination or clientId is null", sub.toString()).getMessage());
                        NettyUtils.write(ctx.getChannel(),
                            errorBytes,
                            new ChannelFutureAggregator(sub.getDestination(),
                                sub,
                                packet.getType(),
                                errorBytes.length,
                                System.nanoTime() - start,
                                (short) 401));
                    }
                    break;
                case UNSUBSCRIPTION://取消订阅
                    ...
                        embeddedServer.unsubscribe(clientIdentity);
                        stopCanalInstanceIfNecessary(clientIdentity);// 尝试关闭
                        ...
                    break;
                case GET://获取binlog
                    Get get = CanalPacket.Get.parseFrom(packet.getBody());
                    ...
                        if (get.getTimeout() == -1) {// 是否是初始值
                            message = embeddedServer.getWithoutAck(clientIdentity, get.getFetchSize());
                        } else {
                            TimeUnit unit = convertTimeUnit(get.getUnit());
                            message = embeddedServer.getWithoutAck(clientIdentity,
                                get.getFetchSize(),
                                get.getTimeout(),
                                unit);
                        }
                        // }

                        ...
                    break;
                case CLIENTACK://客户端消费成功ack
                    ClientAck ack = CanalPacket.ClientAck.parseFrom(packet.getBody());
                    ...
                        clientIdentity = new ClientIdentity(ack.getDestination(), Short.valueOf(ack.getClientId()));
                        embeddedServer.ack(clientIdentity, ack.getBatchId());
                        ...
                                                break;
                case CLIENTROLLBACK://客户端消费失败回滚请求
                    ...
                        if (rollback.getBatchId() == 0L) {
                            embeddedServer.rollback(clientIdentity);// 回滚所有批次
                        } else {
                            embeddedServer.rollback(clientIdentity, rollback.getBatchId()); // 只回滚单个批次
                        }
                        ...
                                            break;
                default:
                    byte[] errorBytes = NettyUtils.errorPacket(400,
                        MessageFormatter.format("packet type={} is NOT supported!", packet.getType()).getMessage());
                    NettyUtils.write(ctx.getChannel(), errorBytes, new ChannelFutureAggregator(ctx.getChannel()
                        .getRemoteAddress()
                        .toString(), null, packet.getType(), errorBytes.length, System.nanoTime() - start, (short) 400));
                    break;
    }

来看下CanalServerWithEmbedded类,

public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements CanalServer, CanalService {

    private static final Logger        logger  = LoggerFactory.getLogger(CanalServerWithEmbedded.class);
    private Map<String, CanalInstance> canalInstances;//destination --> CanalInstance
    ...

实现了一个抽象类,两个接口。里面的一个map存放的是destination和CanalInstance的对应关系,这个我们前面的文章说过。

AbstractCanalLifeCycleCanalServer主要是继承的start和stop方法,很简单,这里不表。来看看实现的CanalService的接口。

public interface CanalService {

    //订阅
    void subscribe(ClientIdentity clientIdentity) throws CanalServerException;
    //取消订阅
    void unsubscribe(ClientIdentity clientIdentity) throws CanalServerException;
    //获取数据,自动ack
    Message get(ClientIdentity clientIdentity, int batchSize) throws CanalServerException;
    //获取数据,自动ack,可以指定超时时间
    Message get(ClientIdentity clientIdentity, int batchSize, Long timeout, TimeUnit unit) throws CanalServerException;
    //获取数据,不ack
    Message getWithoutAck(ClientIdentity clientIdentity, int batchSize) throws CanalServerException;
    //获取数据,不ack,超时时间
    Message getWithoutAck(ClientIdentity clientIdentity, int batchSize, Long timeout, TimeUnit unit)
                                                                                                    throws CanalServerException;
    //ack某个批次的数据
    void ack(ClientIdentity clientIdentity, long batchId) throws CanalServerException;
    //回滚没有ack的批次的数据
    void rollback(ClientIdentity clientIdentity) throws CanalServerException;
    //回滚某个批次的数据
    void rollback(ClientIdentity clientIdentity, Long batchId) throws CanalServerException;
}

先来看看订阅方法,

/**
     * 客户端订阅,重复订阅时会更新对应的filter信息
     */
    @Override
    public void subscribe(ClientIdentity clientIdentity) throws CanalServerException {
        checkStart(clientIdentity.getDestination());//检查 destination对应的instance已经启动

        CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
        //instance持有的metaManager没启动的话就重新启动
        if (!canalInstance.getMetaManager().isStart()) {
            canalInstance.getMetaManager().start();
        }

        //通过CanalInstance的CanalMetaManager组件进行元数据管理,记录一下当前这个CanalInstance有客户端在订阅
        canalInstance.getMetaManager().subscribe(clientIdentity); // 执行一下meta订阅

        //客户端当前订阅的binlog位置
        Position position = canalInstance.getMetaManager().getCursor(clientIdentity);
        if (position == null) {
            //如果是第一次订阅,尝试从CanalEventStore中获取第一个binlog的位置,作为客户端订阅开始的位置。
            position = canalInstance.getEventStore().getFirstPosition();// 获取一下store中的第一条
            if (position != null) {
                canalInstance.getMetaManager().updateCursor(clientIdentity, position); // 更新一下cursor
            }
            logger.info("subscribe successfully, {} with first position:{} ", clientIdentity, position);
        } else {
            logger.info("subscribe successfully, {} use last cursor position:{} ", clientIdentity, position);
        }

        // 通知下订阅关系变化
        canalInstance.subscribeChange(clientIdentity);
    }

解释下。首先可以看到是通过canalInstance下的MetaManager管理订阅,包括binlog的位置这些都在MetaManager下保存。CanalMetaManager接口有几个实现类:

FileMixedMetaManager
MemoryMetaManager
MixedMetaManager
PeriodMixedMetaManager
ZooKeeperMetaManager

这些实现类之间有些会持有其它实现的引用来装饰自己的功能。具体服务中使用哪个实现是我们可以设置的,通过指定对应的枚举设置,下面是个例子:

public static enum MetaMode {
        /** 内存存储模式 */
        MEMORY,
        /** 文件存储模式 */
        ZOOKEEPER,
        /** 混合模式,内存+文件 */
        MIXED,
        /** 本地文件存储模式 */
        LOCAL_FILE;
...

CanalParameter parameter = new CanalParameter();

        parameter.setZkClusters(Arrays.asList(cluster1));
        parameter.setMetaMode(MetaMode.MEMORY);
        ...

然后是取消订阅方法,

/**
     * 取消订阅
     */
    @Override
    public void unsubscribe(ClientIdentity clientIdentity) throws CanalServerException {
        CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
        canalInstance.getMetaManager().unsubscribe(clientIdentity); // 执行一下meta订阅

        logger.info("unsubscribe successfully, {}", clientIdentity);
    }
    
    

这个比较简单,不做过多解释。

看下get方法,

/**
     * 获取数据
     *
     * <pre>
     * 注意: meta获取和数据的获取需要保证顺序性,优先拿到meta的,一定也会是优先拿到数据,所以需要加同步. (不能出现先拿到meta,拿到第二批数据,这样就会导致数据顺序性出现问题)
     * </pre>
     */
    @Override
    public Message get(ClientIdentity clientIdentity, int batchSize) throws CanalServerException {
        return get(clientIdentity, batchSize, null, null);
    }

最终调用的是下面这个,

/**
     * 获取数据,可以指定超时时间.
     *
     * <pre>
     * 几种case:
     * a. 如果timeout为null,则采用tryGet方式,即时获取
     * b. 如果timeout不为null
     *    1. timeout为0,则采用get阻塞方式,获取数据,不设置超时,直到有足够的batchSize数据才返回
     *    2. timeout不为0,则采用get+timeout方式,获取数据,超时还没有batchSize足够的数据,有多少返回多少
     * 
     * 注意: meta获取和数据的获取需要保证顺序性,优先拿到meta的,一定也会是优先拿到数据,所以需要加同步. (不能出现先拿到meta,拿到第二批数据,这样就会导致数据顺序性出现问题)
     * </pre>
     */
    @Override
    public Message get(ClientIdentity clientIdentity, int batchSize, Long timeout, TimeUnit unit)
                                                                                                 throws CanalServerException {
        ...
        synchronized (canalInstance) {//会读写流数据,这里加锁防止并发
            // 获取到流式数据中的最后一批获取的位置
            //从CanalMetaManager中获取最后一个没有ack的binlog批次的位置信息
            PositionRange<LogPosition> positionRanges = canalInstance.getMetaManager().getLastestBatch(clientIdentity);

            if (positionRanges != null) {
                throw new CanalServerException(String.format("clientId:%s has last batch:[%s] isn't ack , maybe loss data",
                    clientIdentity.getClientId(),
                    positionRanges));
            }

            Events<Event> events = null;
            //从当前store中的第一条开始获取
            Position start = canalInstance.getMetaManager().getCursor(clientIdentity);
            events = getEvents(canalInstance.getEventStore(), start, batchSize, timeout, unit);

            if (CollectionUtils.isEmpty(events.getEvents())) {
                logger.debug("get successfully, clientId:{} batchSize:{} but result is null",
                    clientIdentity.getClientId(),
                    batchSize);
                //如果获取到的binlog消息为空,构造一个空的Message对象,将batchId设置为-1返回给客户端
                return new Message(-1, true, new ArrayList()); // 返回空包,避免生成batchId,浪费性能
            } else {
                // 记录到流式信息
                //如果获取到了binlog消息,将这个批次的binlog消息记录到CanalMetaMaager中,并生成一个唯一的batchId
                Long batchId = canalInstance.getMetaManager().addBatch(clientIdentity, events.getPositionRange());
                boolean raw = isRaw(canalInstance.getEventStore());
                List entrys = null;
                if (raw) {
                    entrys = Lists.transform(events.getEvents(), Event::getRawEntry);
                } else {
                    entrys = Lists.transform(events.getEvents(), Event::getEntry);
                }
                if (logger.isInfoEnabled()) {
                    logger.info("get successfully, clientId:{} batchSize:{} real size is {} and result is [batchId:{} , position:{}]",
                        clientIdentity.getClientId(),
                        batchSize,
                        entrys.size(),
                        batchId,
                        events.getPositionRange());
                }
                // 直接提交ack
                ack(clientIdentity, batchId);
                //根据获取到的信息,构造message返回
                return new Message(batchId, raw, entrys);
            }
        }
    }

getWithoutAck就是不带ack的get,所以这里就不多说了。

ack方法如下,

/**
     * 进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。
     *
     * <pre>
     * 注意:进行反馈时必须按照batchId的顺序进行ack(需有客户端保证)
     * </pre>
     */
    @Override
    public void ack(ClientIdentity clientIdentity, long batchId) throws CanalServerException {
        ...

        CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
        PositionRange<LogPosition> positionRanges = null;
        positionRanges = canalInstance.getMetaManager().removeBatch(clientIdentity, batchId); // 更新位置
        if (positionRanges == null) { // 说明是重复的ack/rollback
            throw new CanalServerException(String.format("ack error , clientId:%s batchId:%d is not exist , please check",
                clientIdentity.getClientId(),
                batchId));
        }

    

        // 更新cursor
        if (positionRanges.getAck() != null) {
            canalInstance.getMetaManager().updateCursor(clientIdentity, positionRanges.getAck());
            if (logger.isInfoEnabled()) {
                logger.info("ack successfully, clientId:{} batchId:{} position:{}",
                    clientIdentity.getClientId(),
                    batchId,
                    positionRanges);
            }
        }

        // 可定时清理数据
        //从CanalEventStore中,将这个批次的binlog内容移除
        canalInstance.getEventStore().ack(positionRanges.getEnd(), positionRanges.getEndSeq());
    }

ack方法是客户端用户确认某个批次的binlog消费成功。确认之后,小于等于此 batchId 的 Message 都会被确认(其实就是更新消费成功的位置)。

rollback是用来回滚的。

/**
     * 回滚到未进行 {@link #ack} 的地方,下次fetch的时候,可以从最后一个没有 {@link #ack} 的地方开始拿
     */
    @Override
    public void rollback(ClientIdentity clientIdentity) throws CanalServerException {
        checkStart(clientIdentity.getDestination());
        CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
        // 因为存在第一次链接时自动rollback的情况,所以需要忽略未订阅
        boolean hasSubscribe = canalInstance.getMetaManager().hasSubscribe(clientIdentity);
        if (!hasSubscribe) {
            return;
        }

        synchronized (canalInstance) {
            // 清除batch信息,所有批次
            canalInstance.getMetaManager().clearAllBatchs(clientIdentity);
            // rollback eventStore中的状态信息
            canalInstance.getEventStore().rollback();
            logger.info("rollback successfully, clientId:{}", new Object[] { clientIdentity.getClientId() });
        }
    }

eventstore的rollback方法也很简单,主要是更新下标,

public void rollback() throws CanalStoreException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            getSequence.set(ackSequence.get());//更新最后一次get的位置为ack的位置
            getMemSize.set(ackMemSize.get());//更新get内存大小为ack内存大小
        } finally {
            lock.unlock();
        }
    }

参考:

  • http://www.tianshouzhi.com/api/tutorials/canal/382

以上是关于canal 源码解析系列-CanalServerWithEmbedded解读的主要内容,如果未能解决你的问题,请参考以下文章

canal 源码解析系列-sink模块解析

canal 源码解析系列-EventParser模块解析1

canal 源码解析系列-工程结构说明

canal 源码解析系列-CanalServerWithEmbedded解读

canal 源码解析系列-canal的通信数据结构

canal 源码解析系列-EventParser模块解析2