canal入门
Posted codingjav
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了canal入门相关的知识,希望对你有一定的参考价值。
前言
以前对canal仅仅是停留在书本的概念层面上,从没实际搭建操作过,这不趁着元旦假期,学习输出一波。在此也祝福大家新年快乐,希望2022年大家工作顺利,事业更进一步。
在日常工作中,数据不仅仅是直接保存在数据库中,还会涉及到其他中间组件,比如需要将数据同步到ES中供检索使用,也会把最新数据同步刷新到Redis等缓存中,实现数据一致性。这时就可以用到阿里开源的框架Canal,他可以很方便地同步数据库的增量数据到其他的存储应用。
1、什么是canal
我们先来看下官网的介绍:
canal,译意为水道/管道/沟渠,主要用途是基于 mysql 数据库增量日志解析,提供增量数据订阅和消费。
在这里我们简单的认为它就是个同步增量数据的工具。看下官网形象的示意图:
2、canal的作用
基于日志增量订阅和消费的业务包括:
-
数据库镜像
-
数据库实时备份
-
索引构建和实时维护(拆分异构索引、倒排索引等)
-
业务 cache 刷新
-
带业务逻辑的增量数据处理
3、canal的工作原理
3.1、mysql主备复制原理
-
MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
-
MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
-
MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
3.2、canal的工作原理
-
canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
-
MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
-
canal 解析 binary log 对象(原始为 byte 流)
4、搭建canal
4.1、搭建mysql服务器
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x,我这里采用的是5.7版本的,mysql具体的安装暂不举例,这些网上都有大把的介绍文章。
安装好mysql后,这里创建一个canal专属账号并且授权:
-- 使用命令登录:mysql -u root -p
-- 创建用户 用户名:canal 密码:canal
create user 'canal'@'%' identified by 'canal';
-- 授权 *.*表示所有库
grant SELECT, REPLICATION SLAVE, REPLICATION CLIENT on *.* to 'canal'@'%' identified by 'canal';
同时开启mysql配置文件的bin_log日志和设置row模式。
[mysqld]
# 打开binlog
log-bin=mysql-bin
# 选择ROW(行)模式
binlog-format=ROW
# 配置MySQL replaction需要定义,不要和canal的slaveId重复
server_id=1
改了配置文件之后,重启MySQL,使用命令查看是否打开binlog模式:
查看当前正在写入的binlog文件:
MySQL服务器这边就搞定了,很简单。
4.2、搭建canal服务器
去官网下载页面进行下载:https://github.com/alibaba/canal/releases
我这里下载的是1.1.4的版本:
解压canal.deployer-1.1.4.tar.gz,我们可以看到以下文件夹:
接着打开配置文件conf/example/instance.properties,配置信息如下:
## mysql serverId , v1.0.26+ will autoGen
## v1.0.26版本后会自动生成slaveId,所以可以不用配置
# canal.instance.mysql.slaveId=0
# 数据库地址
canal.instance.master.address=127.0.0.1:3306
# binlog日志名称
canal.instance.master.journal.name=mysql-bin.000001
# mysql主库链接时起始的binlog偏移量
canal.instance.master.position=154
# mysql主库链接时起始的binlog的时间戳
canal.instance.master.timestamp=
canal.instance.master.gtid=
# username/password
# 在MySQL服务器授权的账号密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# 字符集
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
# table regex .*\\\\..*表示监听所有表 也可以写具体的表名,用,隔开
canal.instance.filter.regex=.*\\\\..*
# mysql 数据解析表的黑名单,多个表用,隔开
canal.instance.filter.black.regex=
然后,就可以用bin包下面的 srartup.sh 命令启动canal服务。可以在logs/canal下面看到启动成功输出日志。
5、Java接入canal实操
5.1、引入canal依赖
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
</dependency>
5.2、使用Springboot搭建一个canal项目
canal client代码如下所示:
/**
* 单机Java客户端 消费 canal 消息
* @author jinwu
*/
@Componentpublic class CanalClient implements InitializingBean private final static int BATCH_SIZE = 1000; @Override public void afterPropertiesSet() throws Exception
// 创建链接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), "example", "", "");
try
//打开连接
connector.connect();
//订阅数据库表,全部表
connector.subscribe(".*\\\\..*");
//回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
connector.rollback();
while (true)
// 获取指定数量的数据
Message message = connector.getWithoutAck(BATCH_SIZE);
//获取批量ID
long batchId = message.getId();
//获取批量的数量
int size = message.getEntries().size();
//如果没有数据
if (batchId == -1 || size == 0)
try
//线程休眠2秒
Thread.sleep(2000);
catch (InterruptedException e)
e.printStackTrace();
else
//如果有数据,处理数据
printEntry(message.getEntries());
//进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。
connector.ack(batchId);
catch (Exception e)
e.printStackTrace();
finally
connector.disconnect();
/**
* 打印canal server解析binlog获得的实体类信息
*/
private static void printEntry(List<CanalEntry.Entry> entrys)
for (CanalEntry.Entry entry : entrys)
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND)
//开启/关闭事务的实体类型,跳过
continue;
//RowChange对象,包含了一行数据变化的所有特征
//比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等
CanalEntry.RowChange rowChange;
try
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
catch (Exception e)
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
//获取操作类型:insert/update/delete类型
CanalEntry.EventType eventType = rowChange.getEventType();
//打印Header信息
System.out.println(String.format("================》; binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
//判断是否是DDL语句
if (rowChange.getIsDdl())
System.out.println("================》;isDdl: true,sql:" + rowChange.getSql());
//获取RowChange对象里的每一行数据,打印出来
for (CanalEntry.RowData rowData : rowChange.getRowDatasList())
//如果是删除语句
if (eventType == CanalEntry.EventType.DELETE)
printColumn(rowData.getBeforeColumnsList());
//如果是新增语句
else if (eventType == CanalEntry.EventType.INSERT)
printColumn(rowData.getAfterColumnsList());
//如果是更新的语句
else
//变更前的数据
System.out.println("------->; before");
printColumn(rowData.getBeforeColumnsList());
//变更后的数据
System.out.println("------->; after");
printColumn(rowData.getAfterColumnsList());
/**
* 仅仅是打印出来,实际肯定会处理
* @param columns
*/
private static void printColumn(List<CanalEntry.Column> columns)
for (CanalEntry.Column column : columns)
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
以上步骤就已经完成所有的开发准备工作,下面就可以进入测试阶段,验证是否可用。代码实现部分也只是简单打印出,未做具体的处理。
本地已经创建好了测试订单表,结构如下:
CREATE TABLE `order` (
`id` bigint(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`order_id` varchar(100) NOT NULL DEFAULT '""' COMMENT '订单ID',
`status` int(11) DEFAULT NULL COMMENT '订单状态',
`pay_price` decimal(10,2) NOT NULL DEFAULT '0.00' COMMENT '支付金额',
`total_price` decimal(10,2) NOT NULL DEFAULT '0.00' COMMENT '总金额',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
这里新增一条数据到表中,看看控制台输出日志:
================》; binlog[mysql-bin.000001:363] , name[study,order] , eventType : INSERT
id : 13 update=true
order_id : 533879506af44b9493786110afa02311 update=true
status : 1 update=true
pay_price : 50.0 update=true
total_price : 100.0 update=true
create_time : 2022-01-01 11:10:16 update=true
update_time : 2022-01-01 11:10:16 update=true
6、总结
canal的好处是对于业务代码没有侵入,因为是基于mysql binlog日志去实现同步数据。也能够做到准实时,是很多企业一种比较常见的数据同步的方案。
当然这里仅仅是个入门级别的用例,简单讲解了canal是什么、用来干什么、工作原理等,在实际开发工作中肯定不会是这么简单的。在实际开发中,基本上都是结合消息中间件【RocketMQ或者Kafka等】,canal会把接收到的数据发送到MQ的topic中,然后通过消息队列的消费者进行处理。
如果你觉得这篇文章对你有用,点个赞吧~ 你的点赞是我创作的最大动力~
想第一时间看到我更新的文章,可以微信搜索公众号「CodingCode」。我们下期再见!!!
以上是关于canal入门的主要内容,如果未能解决你的问题,请参考以下文章