canal 源码解析系列-先把demo跑起来
Posted 犀牛饲养员
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了canal 源码解析系列-先把demo跑起来相关的知识,希望对你有一定的参考价值。
写在前面
把demo跑起来,一个是对canal的功能有个整体的认识,还有就是阅读源码过程中如果有看不懂的地方可以debug下。
环境准备
-
zk,安装后启动开启2181端口,具体过程省略。
-
mysql,我本地自己装了一个,方便修改数据验证。另外就是需要确认mysql的binlog格式是ROW才可以。
编写demo
demo分为两部分,服务端和客户端。这两部分我都是根据源码包里单元测试修改的。
server端
服务端demo,参考了源码包里的 com.alibaba.otter.canal.server.CanalServerTest
类,主要配置根据本地的情况做了一些修改。这里贴出来一些重要的部分。
public class CanalServerTestMain {
protected static final String cluster1 = "127.0.0.1:2181";
protected static final String DESTINATION = "test";
protected static final String DETECTING_SQL = "select 1"; //数据源心跳检测
protected static final String MYSQL_ADDRESS = "127.0.0.1"; //mysql地址,端口默认用3306
protected static final String USERNAME = "root"; //连接mysql用户名
protected static final String PASSWORD = "11111111"; //连接mysql 密码
protected static final String FILTER = ".*"; //过滤器,这里表示不过滤
private final ByteBuffer header = ByteBuffer.allocate(4);
private CanalServerWithNetty nettyServer;
public static void main(String[] args) {
CanalServerTestMain test = new CanalServerTestMain();
try {
test.setUp();
System.out.println("start");
} catch (Throwable e) {
e.printStackTrace();
} finally {
System.out.println("running");
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(60));//运行60s自动结束
} catch (Throwable e2) {
}
test.tearDown();
System.out.println("end");
}
}
private Canal buildCanal() {
Canal canal = new Canal();
canal.setId(1L);
canal.setName(DESTINATION);
canal.setDesc("test");
CanalParameter parameter = new CanalParameter();
parameter.setZkClusters(Arrays.asList(cluster1));
parameter.setMetaMode(MetaMode.MEMORY);
parameter.setHaMode(HAMode.HEARTBEAT);
parameter.setIndexMode(IndexMode.MEMORY);
parameter.setStorageMode(StorageMode.MEMORY);
parameter.setMemoryStorageBufferSize(32 * 1024);
parameter.setSourcingType(SourcingType.MYSQL);
parameter.setDbAddresses(Arrays.asList(new InetSocketAddress(MYSQL_ADDRESS, 3306),
new InetSocketAddress(MYSQL_ADDRESS, 3306)));
parameter.setDbUsername(USERNAME);
parameter.setDbPassword(PASSWORD);
// parameter.setPositions(Arrays.asList("{\\"journalName\\":\\"mysql-bin.000013\\",\\"position\\":156L,\\"timestamp\\":1322803601000L}",
// "{\\"journalName\\":\\"mysql-bin.000013\\",\\"position\\":156L,\\"timestamp\\":1322803601000L}"));
parameter.setSlaveId(1234L);
parameter.setDefaultConnectionTimeoutInSeconds(30);
parameter.setConnectionCharset("UTF-8");
parameter.setConnectionCharsetNumber((byte) 33);
parameter.setReceiveBufferSize(8 * 1024);
parameter.setSendBufferSize(8 * 1024);
parameter.setDetectingEnable(false);
parameter.setDetectingIntervalInSeconds(10);
parameter.setDetectingRetryTimes(3);
parameter.setDetectingSQL(DETECTING_SQL);
canal.setCanalParameter(parameter);
return canal;
}
这段代码要特别注意我注释掉的部分,这里指定了binlog文件以及订阅的位置,这个不一定和我们本地的数据库是一致的,为了避免报错,这里不指定让canal自己去数据库获取。
client端
client部分的demo是参考com.alibaba.otter.canal.example.SimpleCanalClientTest
这个类修改来的。
public static void main(String args[]) {
// 根据ip,直接创建链接,无HA的功能
String destination = "test";
String ip = AddressUtils.getHostIp();
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(ip, 1088),
destination,
"",
"");
final SimpleCanalClientTest2 clientTest = new SimpleCanalClientTest2(destination);
clientTest.setConnector(connector);
clientTest.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
logger.info("## stop the canal client");
clientTest.stop();
} catch (Throwable e) {
logger.warn("##something goes wrong when stopping canal:", e);
} finally {
logger.info("## canal client is down.");
}
}));
}
这里主要是确保ip和端口和server端保持一直。
验证
分别运行起来服务端的demo和客户端的demo,看日志确保没有报错。然后在mysql随便修改或者新增一条记录,在客户端的日志可以看到如下的输出:
这个输出的意思是说,我insert了一条记录,记录保护三个字段:dt,pv和uv,以及他们各自的值。输出的结果还是比较直观的。
以上是关于canal 源码解析系列-先把demo跑起来的主要内容,如果未能解决你的问题,请参考以下文章