canal 源码解析系列-先把demo跑起来

Posted 犀牛饲养员

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了canal 源码解析系列-先把demo跑起来相关的知识,希望对你有一定的参考价值。

写在前面

把demo跑起来,一个是对canal的功能有个整体的认识,还有就是阅读源码过程中如果有看不懂的地方可以debug下。

环境准备

  • zk,安装后启动开启2181端口,具体过程省略。

  • mysql,我本地自己装了一个,方便修改数据验证。另外就是需要确认mysql的binlog格式是ROW才可以。

编写demo

demo分为两部分,服务端和客户端。这两部分我都是根据源码包里单元测试修改的。

server端

服务端demo,参考了源码包里的 com.alibaba.otter.canal.server.CanalServerTest类,主要配置根据本地的情况做了一些修改。这里贴出来一些重要的部分。

public class CanalServerTestMain {

    protected static final String cluster1      = "127.0.0.1:2181";
    protected static final String DESTINATION   = "test";
    protected static final String DETECTING_SQL = "select 1"; //数据源心跳检测
    protected static final String MYSQL_ADDRESS = "127.0.0.1"; //mysql地址,端口默认用3306
    protected static final String USERNAME      = "root"; //连接mysql用户名
    protected static final String PASSWORD      = "11111111"; //连接mysql 密码
    protected static final String FILTER        = ".*"; //过滤器,这里表示不过滤

    private final ByteBuffer      header        = ByteBuffer.allocate(4);
    private CanalServerWithNetty  nettyServer;

    public static void main(String[] args) {

        CanalServerTestMain test = new CanalServerTestMain();
        try {
            test.setUp();
            System.out.println("start");
        } catch (Throwable e) {
            e.printStackTrace();
        } finally {
            System.out.println("running");
            try {
                Thread.sleep(TimeUnit.SECONDS.toMillis(60));//运行60s自动结束
            } catch (Throwable e2) {
            }
            test.tearDown();
            System.out.println("end");
        }

    }

private Canal buildCanal() {
        Canal canal = new Canal();
        canal.setId(1L);
        canal.setName(DESTINATION);
        canal.setDesc("test");

        CanalParameter parameter = new CanalParameter();

        parameter.setZkClusters(Arrays.asList(cluster1));
        parameter.setMetaMode(MetaMode.MEMORY);
        parameter.setHaMode(HAMode.HEARTBEAT);
        parameter.setIndexMode(IndexMode.MEMORY);

        parameter.setStorageMode(StorageMode.MEMORY);
        parameter.setMemoryStorageBufferSize(32 * 1024);

        parameter.setSourcingType(SourcingType.MYSQL);
        parameter.setDbAddresses(Arrays.asList(new InetSocketAddress(MYSQL_ADDRESS, 3306),
            new InetSocketAddress(MYSQL_ADDRESS, 3306)));
        parameter.setDbUsername(USERNAME);
        parameter.setDbPassword(PASSWORD);
//        parameter.setPositions(Arrays.asList("{\\"journalName\\":\\"mysql-bin.000013\\",\\"position\\":156L,\\"timestamp\\":1322803601000L}",
//            "{\\"journalName\\":\\"mysql-bin.000013\\",\\"position\\":156L,\\"timestamp\\":1322803601000L}"));

        parameter.setSlaveId(1234L);

        parameter.setDefaultConnectionTimeoutInSeconds(30);
        parameter.setConnectionCharset("UTF-8");
        parameter.setConnectionCharsetNumber((byte) 33);
        parameter.setReceiveBufferSize(8 * 1024);
        parameter.setSendBufferSize(8 * 1024);

        parameter.setDetectingEnable(false);
        parameter.setDetectingIntervalInSeconds(10);
        parameter.setDetectingRetryTimes(3);
        parameter.setDetectingSQL(DETECTING_SQL);

        canal.setCanalParameter(parameter);
        return canal;
    }

这段代码要特别注意我注释掉的部分,这里指定了binlog文件以及订阅的位置,这个不一定和我们本地的数据库是一致的,为了避免报错,这里不指定让canal自己去数据库获取。

client端

client部分的demo是参考com.alibaba.otter.canal.example.SimpleCanalClientTest这个类修改来的。

public static void main(String args[]) {
        // 根据ip,直接创建链接,无HA的功能
        String destination = "test";
        String ip = AddressUtils.getHostIp();
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(ip, 1088),
            destination,
            "",
            "");

        final SimpleCanalClientTest2 clientTest = new SimpleCanalClientTest2(destination);
        clientTest.setConnector(connector);
        clientTest.start();
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            try {
                logger.info("## stop the canal client");
                clientTest.stop();
            } catch (Throwable e) {
                logger.warn("##something goes wrong when stopping canal:", e);
            } finally {
                logger.info("## canal client is down.");
            }
        }));
    }

这里主要是确保ip和端口和server端保持一直。

验证

分别运行起来服务端的demo和客户端的demo,看日志确保没有报错。然后在mysql随便修改或者新增一条记录,在客户端的日志可以看到如下的输出:

这个输出的意思是说,我insert了一条记录,记录保护三个字段:dt,pv和uv,以及他们各自的值。输出的结果还是比较直观的。

以上是关于canal 源码解析系列-先把demo跑起来的主要内容,如果未能解决你的问题,请参考以下文章

canal 源码解析系列-CanalInstance模块解析

canal 源码解析系列-canal的HA机制解析

canal 源码解析系列-sink模块解析

canal 源码解析系列-EventParser模块解析1

canal 源码解析系列-工程结构说明

canal 源码解析系列-CanalServerWithEmbedded解读