canal使用小结
Posted eric-fang
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了canal使用小结相关的知识,希望对你有一定的参考价值。
一、基本概念
mysql本身支持主从备份,原理就是主库master生成的binlog文件记录了所有的增删改操作语句,然后slave向master发送dump协议,master将binlog日志文件推送给从库slave解析执行,达到数据一致备份的目的。
canal,基于java开发,伪装成一个slave,去监听获取增量的binlog日志文件,然后解析处理获得的相关数据(过程中可以加入自由的加入一些额外的功能性代码需求),利用获得的数据,可以用其他不同用途,比如同步到es中做搜索相关。
二、canal基本配置使用
测试环境:windows、mysql 5.7.26、canal 1.1.3、Navicat for MySQL。
1、mysql安装和配置
1.1、下载安装解压忽略。进入mysql解压后目录,新增data文件夹。
1.2、新增my.ini文件,添加配置:
[client] # 设置mysql客户端连接服务端时默认使用的端口 port=3311 [mysql] default-character-set=utf8 [mysqld] character-set-server=utf8 port=3311 # 默认存储引擎innoDB default-storage-engine=INNODB # Server Id.数据库服务器id,这个id用来在主从服务器中标记唯一mysql服务器 server-id=1 datadir=E:\\\\soft\\\\mysql2\\\\data bind-address=0.0.0.0 # 开启binlog日志 log-bin=mysql-bin binlog_format = ROW
1.3、cmd进入并目录,启动/关闭 mysql:
//启动 net start mysql //关闭 net stop mysql
1.4、连接mysql并设置密码
连接:mysql -uroot -p,初始密码为空,一直按enter即可进入mysql命令行。
进入后设置密码:
// 切换库 use mysql; // 设置密码 update user set authentication_string=PASSWORD("123456") where user="root"; // 刷新生效 flush privileges;
设置成功后,quit退出重进,输入密码123456。
1.5、新增个canal的访问账户
// 新增用户 CREATE USER ‘canal‘@‘%‘ IDENTIFIED BY ‘canal‘; // 授权 GRANT SHOW VIEW, SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO ‘canal‘@‘%‘; // 刷新 FLUSH PRIVILEGES;
2、canal安装配置
下载canal包(https://github.com/alibaba/canal/releases),解压本地目录。
2.1、目录结构
其中cfang是拷贝example,需要多个instance可继续拷贝,再修改每个instance中的配置文件。
2.1、配置canal.properties
port可自定义,用于canal对外服务接口。destinations配置instance列表(连接db)。
2.2、配置instance.properties
其中canal.instance.defaultDatabaseName可不配置,全库扫描。
2.3、启动
bin目录,点击startup.bat,查看/logs/canal/canal.log日志文件,出现以下则为开启成功:
2.4、canal数据格式:
Entry Header logfileName [binlog文件名] logfileOffset [binlog position] executeTime [发生的变更] schemaName tableName eventType [insert/update/delete类型] entryType [事务头BEGIN/事务尾END/数据ROWDATA] storeValue [byte数据,可展开,对应的类型为RowChange] RowChange isDdl [是否是ddl变更操作,比如create table/drop table] sql [具体的ddl sql] rowDatas [具体insert/update/delete的变更数据,可为多条,1个binlog event事件可对应多条变更,比如批处理] beforeColumns [Column类型的数组] afterColumns [Column类型的数组] Column index sqlType [jdbc type] name [column name] isKey [是否为主键] updated [是否发生过变更] isNull [值是否为null] value [具体的内容,注意为文本]
2.5、java程序测试
pom导入:
<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.3</version> </dependency>
java测试:
package com.cfang.prebo; import java.net.InetSocketAddress; import java.util.List; import java.util.stream.Collectors; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry.Column; import com.alibaba.otter.canal.protocol.CanalEntry.Entry; import com.alibaba.otter.canal.protocol.CanalEntry.EntryType; import com.alibaba.otter.canal.protocol.CanalEntry.EventType; import com.alibaba.otter.canal.protocol.CanalEntry.RowChange; import com.alibaba.otter.canal.protocol.CanalEntry.RowData; import com.alibaba.otter.canal.protocol.Message; public class CanalTest public static void main(String[] args) throws Exception CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), "cfang", "", ""); connector.connect(); connector.subscribe(".*\\\\..*"); connector.rollback(); while (true) Message message = connector.getWithoutAck(100); // 获取指定数量的数据 long batchId = message.getId(); if (batchId == -1 || message.getEntries().isEmpty()) Thread.sleep(1000); continue; // System.out.println(message.getEntries()); printEntries(message.getEntries()); connector.ack(batchId);// 提交确认,消费成功,通知server删除数据 // connector.rollback(batchId);// 处理失败, 回滚数据,后续重新获取数据 private static void printEntries(List<Entry> entries) throws Exception for (Entry entry : entries) if (entry.getEntryType() != EntryType.ROWDATA) continue; RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); EventType eventType = rowChange.getEventType(); System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); for (RowData rowData : rowChange.getRowDatasList()) switch (rowChange.getEventType()) case INSERT: System.out.println("INSERT "); printColumns(rowData.getAfterColumnsList()); break; case UPDATE: System.out.println("UPDATE "); printColumns(rowData.getAfterColumnsList()); break; case DELETE: System.out.println("DELETE "); printColumns(rowData.getBeforeColumnsList()); break; default: break; private static void printColumns(List<Column> columns) for(Column column : columns) System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
Navicat中进行相关操作的时候,可在控台看到输出,例如:
以上是关于canal使用小结的主要内容,如果未能解决你的问题,请参考以下文章
Canal——canal server 读取 binlog 到 kafka 然后在使用 canal-adapter