canal入门

Posted codingjav

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了canal入门相关的知识,希望对你有一定的参考价值。

前言


    以前对canal仅仅是停留在书本的概念层面上,从没实际搭建操作过,这不趁着元旦假期,学习输出一波。在此也祝福大家新年快乐,希望2022年大家工作顺利,事业更进一步。

    在日常工作中,数据不仅仅是直接保存在数据库中,还会涉及到其他中间组件,比如需要将数据同步到ES中供检索使用,也会把最新数据同步刷新到Redis等缓存中,实现数据一致性。这时就可以用到阿里开源的框架Canal,他可以很方便地同步数据库的增量数据到其他的存储应用

1、什么是canal

我们先来看下官网的介绍:

canal,译意为水道/管道/沟渠,主要用途是基于 mysql 数据库增量日志解析,提供增量数据订阅和消费。

在这里我们简单的认为它就是个同步增量数据的工具。看下官网形象的示意图:

2、canal的作用

基于日志增量订阅和消费的业务包括:

  • 数据库镜像

  • 数据库实时备份

  • 索引构建和实时维护(拆分异构索引、倒排索引等)

  • 业务 cache 刷新

  • 带业务逻辑的增量数据处理

3、canal的工作原理

3.1、mysql主备复制原理

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)

  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)

  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

3.2、canal的工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议

  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )

  • canal 解析 binary log 对象(原始为 byte 流)

4、搭建canal

4.1、搭建mysql服务器

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x,我这里采用的是5.7版本的,mysql具体的安装暂不举例,这些网上都有大把的介绍文章。

安装好mysql后,这里创建一个canal专属账号并且授权:

-- 使用命令登录:mysql -u root -p-- 创建用户 用户名:canal 密码:canalcreate user 'canal'@'%' identified by 'canal';-- 授权 *.*表示所有库grant SELECT, REPLICATION SLAVE, REPLICATION CLIENT on *.* to 'canal'@'%' identified by 'canal';

同时开启mysql配置文件的bin_log日志和设置row模式。

[mysqld]# 打开binloglog-bin=mysql-bin# 选择ROW(行)模式binlog-format=ROW# 配置MySQL replaction需要定义,不要和canal的slaveId重复server_id=1

改了配置文件之后,重启MySQL,使用命令查看是否打开binlog模式:

查看当前正在写入的binlog文件:

MySQL服务器这边就搞定了,很简单。

4.2、搭建canal服务器

去官网下载页面进行下载:https://github.com/alibaba/canal/releases

我这里下载的是1.1.4的版本:

解压canal.deployer-1.1.4.tar.gz,我们可以看到以下文件夹:

接着打开配置文件conf/example/instance.properties,配置信息如下:

## mysql serverId , v1.0.26+ will autoGen## v1.0.26版本后会自动生成slaveId,所以可以不用配置# canal.instance.mysql.slaveId=0# 数据库地址canal.instance.master.address=127.0.0.1:3306# binlog日志名称canal.instance.master.journal.name=mysql-bin.000001# mysql主库链接时起始的binlog偏移量canal.instance.master.position=154# mysql主库链接时起始的binlog的时间戳canal.instance.master.timestamp=canal.instance.master.gtid=# username/password# 在MySQL服务器授权的账号密码canal.instance.dbUsername=canalcanal.instance.dbPassword=canal# 字符集canal.instance.connectionCharset = UTF-8# enable druid Decrypt database passwordcanal.instance.enableDruid=false# table regex .*\\\\..*表示监听所有表 也可以写具体的表名,用,隔开canal.instance.filter.regex=.*\\\\..*# mysql 数据解析表的黑名单,多个表用,隔开canal.instance.filter.black.regex=

然后,就可以用bin包下面的 srartup.sh 命令启动canal服务。可以在logs/canal下面看到启动成功输出日志。

5、Java接入canal实操

5.1、引入canal依赖

<dependency>    <groupId>com.alibaba.otter</groupId>    <artifactId>canal.client</artifactId>    <version>1.1.4</version></dependency>

5.2、使用Springboot搭建一个canal项目

canal client代码如下所示:

/** * 单机Java客户端 消费 canal 消息 * @author jinwu */@Componentpublic class CanalClient implements InitializingBean 

private final static int BATCH_SIZE = 1000;        
       @Override
        public void afterPropertiesSet() throws Exception             // 创建链接            CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), "example", "", "");            try                 //打开连接                connector.connect();                //订阅数据库表,全部表                connector.subscribe(".*\\\\..*");                //回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿                connector.rollback();                while (true)                     // 获取指定数量的数据                    Message message = connector.getWithoutAck(BATCH_SIZE);                    //获取批量ID                    long batchId = message.getId();                    //获取批量的数量                    int size = message.getEntries().size();                    //如果没有数据                    if (batchId == -1 || size == 0)                         try                             //线程休眠2秒                            Thread.sleep(2000);                         catch (InterruptedException e)                             e.printStackTrace();                                             else                         //如果有数据,处理数据                        printEntry(message.getEntries());                                        //进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。                    connector.ack(batchId);                             catch (Exception e)                 e.printStackTrace();             finally                 connector.disconnect();                    /**         * 打印canal server解析binlog获得的实体类信息         */        private static void printEntry(List<CanalEntry.Entry> entrys)             for (CanalEntry.Entry entry : entrys)                 if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND)                     //开启/关闭事务的实体类型,跳过                    continue;                                //RowChange对象,包含了一行数据变化的所有特征                //比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等                CanalEntry.RowChange rowChange;                try                     rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());                 catch (Exception e)                     throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);                                //获取操作类型:insert/update/delete类型                CanalEntry.EventType eventType = rowChange.getEventType();                //打印Header信息                System.out.println(String.format("================》; binlog[%s:%s] , name[%s,%s] , eventType : %s",                        entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),                        entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),                        eventType));                //判断是否是DDL语句                if (rowChange.getIsDdl())                     System.out.println("================》;isDdl: true,sql:" + rowChange.getSql());                                //获取RowChange对象里的每一行数据,打印出来                for (CanalEntry.RowData rowData : rowChange.getRowDatasList())                     //如果是删除语句                    if (eventType == CanalEntry.EventType.DELETE)                         printColumn(rowData.getBeforeColumnsList());                        //如果是新增语句                     else if (eventType == CanalEntry.EventType.INSERT)                         printColumn(rowData.getAfterColumnsList());                        //如果是更新的语句                     else                         //变更前的数据                        System.out.println("------->; before");                        printColumn(rowData.getBeforeColumnsList());                        //变更后的数据                        System.out.println("------->; after");                        printColumn(rowData.getAfterColumnsList());                                                        /**     * 仅仅是打印出来,实际肯定会处理     * @param columns     */    private static void printColumn(List<CanalEntry.Column> columns)             for (CanalEntry.Column column : columns)                 System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());            

以上步骤就已经完成所有的开发准备工作,下面就可以进入测试阶段,验证是否可用。代码实现部分也只是简单打印出,未做具体的处理。

本地已经创建好了测试订单表,结构如下:

​​​​​​​

CREATE TABLE `order` (  `id` bigint(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',  `order_id` varchar(100) NOT NULL DEFAULT '""' COMMENT '订单ID',  `status` int(11) DEFAULT NULL COMMENT '订单状态',  `pay_price` decimal(10,2) NOT NULL DEFAULT '0.00' COMMENT '支付金额',  `total_price` decimal(10,2) NOT NULL DEFAULT '0.00' COMMENT '总金额',  `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,  PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

这里新增一条数据到表中,看看控制台输出日志:

​​​​​​​

================》; binlog[mysql-bin.000001:363] , name[study,order] , eventType : INSERTid : 13    update=trueorder_id : 533879506af44b9493786110afa02311    update=truestatus : 1    update=truepay_price : 50.0    update=truetotal_price : 100.0    update=truecreate_time : 2022-01-01 11:10:16    update=trueupdate_time : 2022-01-01 11:10:16    update=true

6、总结

    canal的好处是对于业务代码没有侵入,因为是基于mysql binlog日志去实现同步数据。也能够做到准实时,是很多企业一种比较常见的数据同步的方案。

    当然这里仅仅是个入门级别的用例,简单讲解了canal是什么、用来干什么、工作原理等,在实际开发工作中肯定不会是这么简单的。在实际开发中,基本上都是结合消息中间件【RocketMQ或者Kafka等】,canal会把接收到的数据发送到MQ的topic中,然后通过消息队列的消费者进行处理

如果你觉得这篇文章对你有用,点个赞吧~    你的点赞是我创作的最大动力~

想第一时间看到我更新的文章,可以微信搜索公众号「CodingCode」。我们下期再见!!!

以上是关于canal入门的主要内容,如果未能解决你的问题,请参考以下文章

canal入门

canal入门

建议收藏超详细的Canal入门,看这篇就够了!!!

秒杀系统番外篇 | 阿里开源MySQL中间件Canal快速入门

建议收藏超详细的Canal入门,看这篇就够了!!!

Python入门基础