数据同步中间件服务

Posted 土豆仙

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了数据同步中间件服务相关的知识,希望对你有一定的参考价值。

   数据同步可用性还是很高的,这两天有时间便来总结下开源数据同步方案。

   个人学习流程:


   适合查阅人员:后端开发,测试人员,数据分析工程师。


概念介绍:

ETL(数据仓库技术):英文Extract-Transform-Load的缩写,用来描述将数据从来源端经过抽取(extract)、转换(transform)、加载(load)至目的端的过程。


数据同步划分

全量同步:

  • 分页查询源端的表,然后通过 jdbc的batch 方式插入到目标表

  • 基于数据文件导出和导入的全量同步,这种同步方式一般只适用于同种数据库之间的同步

增量同步:

增量同步一般是做实时的同步

  • 基于触发器的增量同步

  • 开源中间件实现,基于数据库日志实现(例如mysql的binlog)


开源工具

  • datax

    datax 是阿里开源的etl 工具,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步功能,采用java+python进行开发,核心是java语言实现。数据交换通过DataX进行中转,任何数据源只要和DataX连接上即可以和已实现的任意数据源同步。


  • databus

    Databus是一个实时的、可靠的、支持事务的、保持一致性的数据变更抓取系统。2011年在LinkedIn正式进入生产系统,2013年开源。


  • gobblin

  Gobblin是用来整合各种数据源的通用型ETL框架,在某种意义上,各种数据都可以在这里“一站式”的解决ETL整个过程,专为大数据采集而生,易于操作和监控,提供流式抽取支持。主要用于Kafka的数据同步到HDFS。


  • MongoShake

 MongoShake是阿里巴巴Nosql团队开源出来的一个项目,主要用于mongdb的数据同步到kafka或者其他的mongdb数据库中。集群数据同步是其中核心应用场景,通过抓取oplog后进行回放达到同步目的,实现灾备和多活的业务场景。


  • Flinkx

  FlinkX是一个基于Flink的批流统一的数据同步工具,既可以采集静态的数据,比如MySQL,HDFS等,也可以采集实时变化的数据,比如MySQL binlog,Kafka等。


  • canal

  阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。不过早期的数据库同步业务,主要是基于trigger的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务。


  • 其他

  1. 使用spark任务,通过HQl读取数据,然后再通过hbase的Api插入到hbase中。

  2. 使用BulkLoad可以快速导入,BulkLoad主要是借用了hbase的存储设计思想,因为hbase本质是存储在hdfs上的一个文件夹,然后底层是以一个个的Hfile存在的。如果不是hive中的数据,比如外部的数据,那么我们可以将外部的数据生成文件,然后上传到hdfs中,组装RowKey,然后将封装后的数据在回写到HDFS上,以HFile的形式存储到HDFS指定的目录中。

  3. 基于sqoop的全量导入


比较总结:

  datax一般比较适合于全量数据同步,对全量数据同步效率很高(任务可以拆分,并发同步,所以效率高),对于增量数据同步支持的不太好(可以依靠时间戳+定时调度来实现,但是不能做到实时,延迟较大)。

    canal 、databus 等由于是通过日志抓取的方式进行同步,所以对增量同步支持的比较好。

databus,flinkx活跃度也不是非常高,关注的人还不是很多,MongoShake专为mongdb服务不作考虑。

    比较分析,同步服务选用datax,canal使用人多,国人开发,文档源码易读。


本文主要介绍datax,canal两款工具。


1.全量离线同步datax

   DataX本身作为数据同步框架,将不同数据源的同步抽象为从源头数据源读取数据的Reader插件(插件化是值得总结的技术技巧),以及向目标端写入数据的Writer插件,理论上DataX框架可以支持任意数据源类型的数据同步工作。

数据同步中间件服务

流程:

数据同步中间件服务

架构:

数据同步中间件服务

核心模块介绍:

  1. DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。

  2. DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。

  3. 切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。

  4. 每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。

  5. DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0

DataX调度流程:

  举例来说,用户提交了一个DataX作业,并且配置了20个并发,目的是将一个100张分表的mysql数据同步到odps里面。DataX的调度决策思路是:

  1. DataXJob根据分库分表切分成了100个Task。

  2. 根据20个并发,DataX计算共需要分配4个TaskGroup。

  3. 4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责以5个并发共计运行25个Task。

优势:

  • 每种插件都有自己的数据转换策略;

  • 提供作业全链路的流量以及数据量运行时监控,包括作业本身状态、数据流量、数据速度、执行进度等。

  • 由于各种原因导致传输报错的脏数据,DataX可以实现精确的过滤、识别、采集、展示,为用户提过多种脏数据处理模式;

  • 精确的速度控制

  • 健壮的容错机制,包括线程内部重试、线程级别重试;

插件视角看框架

  • Job:是DataX用来描述从一个源头到目的的同步作业,是DataX数据同步的最小业务单元;

  • Task:为最大化而把Job拆分得到最小的执行单元,进行并发执行;

  • TaskGroup:一组Task集合,在同一个TaskGroupContainer执行下的Task集合称为TaskGroup;

  • JobContainer:Job执行器,负责Job全局拆分、调度、前置语句和后置语句等工作的工作单元。类似Yarn中的JobTracker;

  • TaskGroupContainer:TaskGroup执行器,负责执行一组Task的工作单元,类似Yarn中的TAskTacker。

    即,Job拆分为Task,分别在框架提供的容器中执行,插件只需要实现Job和Task两部分逻辑。

    物理执行有三种运行模式:

  • Standalone:单进程运行,没有外部依赖;

  • Local:单进程运行,统计信息,错误信息汇报到集中存储;

  • Distrubuted:分布式多线程运行,依赖DataX Service服务;

    总体来说,当JobContainer和TaskGroupContainer运行在同一个进程内的时候就是单机模式,在不同进程执行就是分布式模式。

类型

数据源

Reader(读)

Writer(写)

文档

RDBMS 关系型数据库

MySQL

 、


Oracle    

    √    

    √    

 、


SQLServer

 、


PostgreSQL

 、


DRDS

 、


通用RDBMS(支持所有关系型数据库)

 、

阿里云数仓数据存储

ODPS

 、


ADS



OSS

 、


OCS

 、

NoSQL数据存储

OTS

 、


Hbase0.94

 、


Hbase1.1

 、


Phoenix4.x

 、


Phoenix5.x

 、


MongoDB

 、


Hive

 、

无结构化数据存储

TxtFile

 、


FTP

 、


HDFS

 、


Elasticsearch


时间序列数据库

OpenTSDB



TSDB


插件开发指南:https://github.com/alibaba/DataX/blob/master/dataxPluginDev.md


使用:

环境要求:

  • Java 8(jdk版本建议1.8.201以上)

  • Python2.7(Pythonn3和Python2同时安装的可以修改执行文件名)

  • Database: Mysql5.7

(1)、下载DataX源码: git clone git@github.com:alibaba/DataX.git
(2)、通过maven打包: mvn -U clean package assembly:assembly -Dmaven.test.skip=true
 结构如下:bin conf job lib log log_perf plugin script tmp

注意连接数据库的jar(可替换成自己对应的数据库版本)

可视化界面:

(1)、下载datax-web源码git clone https://github.com/WeiYe-Jing/datax-web.git
(2)、创建数据库执行bin/db下面的datax_web.sql文件
(3)、修改配置1.修改datax_admin下resources/application.yml文件2.修改datax_executor下resources/application.yml文件


启动项目:

1.本地idea开发环境

  • 运行datax_admin下 DataXAdminApplication

  • 运行datax_executor下 DataXExecutorApplication

数据同步中间件服务

启动成功后打开页面(默认管理员用户名:admin 密码:123456)

http://localhost:8080/index.html#/dashboard


2.配置执行

  • 创建项目

数据同步中间件服务


  • 创建数据源库与目标库

数据同步中间件服务

  • 创建定时同步任务模板

数据同步中间件服务

  • 创建同步规则Json

数据同步中间件服务

  • 执行同步任务

数据同步中间件服务

参数配置说明:

同步规则Json

{ "job": { "content": [ { "reader": { "name": "mysqlreader", "parameter": { "column": [//需要同步的字段 "district_code", "district_name" ], "connection": [//源数据库连接 { "jdbcUrl": [ "jdbc:mysql://localhost:3306/cci?characterEncoding=utf8&allowPublicKeyRetrieval=true&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=UTC" ], "table": [//源表 "std_com_code_district" ] } ], "password": "123456", "username": "root", "where": "" } }, "writer": { "name": "mysqlwriter", "parameter": { "column": [//目标字段 "district_code", "district_name" ], "connection": [//目标数据库连接 { "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/cci_canal?useUnicode=true&characterEncoding=utf8&useSSL=false", "table": [ "syn_table_test" ] } ], "password": "123456", "preSql": [], "session": [], "username": "root", "writeMode": "insert" //写入模式 } } } ], "setting": { "speed": { "channel": "1" } } }}

效果:

每天定时同步数据

源表

数据同步中间件服务

目标表:

数据同步中间件服务


总结:

    Datax适合离线全量同步,对于增量同步也有伪方案

    MysqlReader使用JDBC SELECT语句完成数据抽取工作,因此可以使用SELECT…WHERE…进行增量数据抽取,方式有多种:

  • 数据库在线应用写入数据库时,填充modify字段为更改时间戳,包括新增、更新、删除(逻辑删)。对于这类应用,MysqlReader只需要WHERE条件跟上一同步阶段时间戳即可。

  • 对于新增流水型数据,MysqlReader可以WHERE条件后跟上一阶段最大自增ID即可。

   对于业务上无字段区分新增、修改数据情况,MysqlReader也无法进行增量数据同步,只能同步全量数据。


  以上是datax数据同步,以后数据同步就不需要自己写定时任务了,简单的配一配,复杂的在这基础上再开发。完美!


2.增量实时同步canal

数据同步中间件服务

开源的组件,已经实现模拟成一个mysql的slave,拉取binlog的服务:

  • 阿里巴巴开源的canal

  • 美团开源的puma

  • linkedin开源的databus  


增量同步服务要考虑的问题:

  • 如何解决重复插入

  • 如何解决唯一索引冲突

  • 对于DDL语句如何处理

  • 如何解决数据回环问题


canal/otter基于数据库的日志解析,获取增量变更进行同步可以实现量订阅&消费的业务。canal的最新版本已经实现了GTID,数据重复插入,数据回环。可以进行DDL语句过滤。

应用场景:

  1. 数据库镜像

  2. 数据库实时备份

  3. 多级索引 (卖家和买家各自分库索引)

  4. search build

  5. 业务cache刷新

  6. 价格变化等重要业务消息


原理:

  1. canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议

  2. mysql master收到dump请求,开始推送binary log给slave(也就是canal)

  3. canal解析binary log对象(原始为byte流)

数据同步中间件服务

Canal内部组件解析

数据同步中间件服务


数据同步中间件服务


  1. Canal 节点,可以有多个instances,每个instance在运行时为一个单独的Spring Context,对象实例为“CanalInstanceWithSpring”。
  2. 每个instances有一个单独的线程处理整个数据流过程。
    instance 内部有EventParser、EventSink、EventStore、metaManager主要四个组件构成,当然还有其他的守护组件比如monitor、HA心跳检测、ZK事件监听等。对象实例初始化和依赖关系,可以参见“default-instance.xml”,其配置模式为普通的Spring。(源码参见:SpringCanalInstanceGenerator

    Canal server的HA机制:

       canal的ha分为两部分,canal server和canal client分别有对应的ha实现canal server:  为了减少对mysql dump的请求,不同server上的instance要求同一时间只能有一个处于running,其他的处于standby状态.

          canal client: 为了保证有序性,一份instance同一时间只能由一个canal client进行get/ack/rollback操作,否则客户端接收无法保证有序。整个HA机制的控制主要是依赖了zookeeper的几个特性,watcher和EPHEMERAL节点(和session生命周期绑定)





    数据同步中间件服务

  3. Parser主要用于解析指定"数据库"的binlog,内部基于JAVA实现的“binlog dump”、“show master status”等。Parser会与ZK交互,并获取当前instance所有消费者的cursor,并获其最小值,作为此instance解析binlog的起始position。目前的实现,一个instance同时只能有一个consumer处于active消费状态,ClientId为定值“1001”,“cursor”中包含consumer消费binlog的position,数字类型。有次可见,Canal instance本身并没有保存binlog的position,Parser中继操作是根据consumer的消费cursor位置来决定;对于信息缺失时,比如Canal集群初次online,且在“default-instance.xml”中也没有指定“masterPositiion”信息(每个instance.properties是可以指定起始position的),那么将根据“show master status”指令获取当前binlog的最后位置。
    (源码:MysqlEventParser.findStartPosition()
    mysql的Binlay log网络协议:

    数据同步中间件服务

  4. Parser每次、批量获取一定条数的binlog,将binlog数据封装成event,并经由EventSink将消息转发给EventStore,Sink的作用就是“协调Parser和Store”,确保binglog的解析速率与Store队列容量相容。

    数据同步中间件服务

  5. EventStore,用于暂存“尚未消费”的events的存储队列,默认基于内存的阻塞队列实现。Store中的数据由Sink组件提交入队,有NettyServer服务的消费者消费确认后出队,队列的容量和容量模式由“canal.properties”中的“memory”相关配置决定。当Store中容量溢满时,将会阻塞Sink操作(间接阻塞Parser),所以消费者的效能会直接影响instance的同步效率。借鉴了Disruptor的RingBuffer的实现思路

  6. metaManager :主要用于保存Parser组件、CanalServer(即本文中提到的NettyServer)、Canal Instances的meta数据,其中Parser组件涉及到的binlog position、CanalServer与消费者交互时ACK的Cursor信息、instance的集群运行时信息等。根据官方解释,我们在production级别、高可靠业务要求场景下,metaManager建议基于Zookeeper实现。
    其中有关Position信息由CanalLogPositionManager类负责,其实现类有多个,在Cluster模式下,建议基于FailbackLogPositionManager,其内部有“primary”、“failback”两级组合,优先基于primary来存取Position,只有当primary异常时会“降级”使用failback;其配置模式,建议与“default-instance.xml”保持一致。

数据同步中间件服务


数据同步中间件服务

部署使用:


数据同步中间件服务

    canal的作用就是类似于前面所述的binlog syncer,拉取解析binlog。otter是canal的客户端,专门用于进行数据同步,类似于前文所讲解的sql writer。


安装步骤:

canal+kafka+mysql+canal-admin联合部署

docker run --name localdb_mysql -v D:docker_datamysqlmy.cnf:/etc/mysql/conf.d -v D:docker_datamysqllogs:/logs -v D:docker_datamysqldata:/var/lib/mysql -e MYSQL_ROOT_PASSWORD=123456 -d -i -p 3306:3306 mysql:5.7.18
mysql数据库配置(docker安装可以使用echo写入)cd /etc/mysql/mysql.conf.decho -e '[mysqld] pid-file = /var/run/mysqld/mysqld.pid socket = /var/run/mysqld/mysqld.sock datadir = /var/lib/mysql symbolic-links=0 server-id = 1 log-bin = binlog log-bin-index = binlog.index'>mysqld.cnf#添加这一行就ok log-bin=mysql-bin#选择row模式binlog-format=ROW#配置mysql replaction需要定义,不能和canal的slaveId重复server_id=1
创建同步账号CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;FLUSH PRIVILEGES;
//单机测试暂时不用安装zk和kafka安装zookeeperdocker run --privileged=true -d --name zookeeper --publish 2181:2181 -d zookeeper:latest
安装zookeeper客户端  =》zkui  或者 prettyZoo
安装kafkadocker run -d --name kafka --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_HOST_NAME=localhost --env KAFKA_ADVERTISED_PORT=9092 wurstmeister/kafka
安装kafka管理界面docker run -d --name kafka-manager --link zookeeper:zookeeper --link kafka:kafka -p 9001:9000 --restart=always --env ZK_HOSTS=zookeeper:2181 sheepkiller/kafka-manager
启动canal、canal-admin、canal-adaptergit clone https://github.com/alibaba/canal.git
测试kafka安装成功://创建topicbin/kafka-topics.sh --create --zookeeper zookeeper :2181 --replication-factor 1 --partitions 1 --topic mykafka//查看topicbin/kafka-topics.sh --list --zookeeper zookeeper :2181//创建生产者bin/kafka-console-producer.sh --broker-list zookeeper :9092 --topic mykafka //创建消费者bin/kafka-console-consumer.sh --zookeeper zookeeper :2181 --topic mykafka --from-beginning


配置:

canal配置:

## 当前canal节点部署的instances列表,以“,”分割 ##比如:test,example canal.destinations= example ##canal配置文件主目录,保持默认即可。##除非你为了提高canal的动态管理能力,将conf文件迁移到了其他目录(比如NFS目录等) canal.conf.dir = ../conf # 是否开启“instance”配置修改自动扫描和重载 ##1)conf.dir目录下新增、删除instance配置目录 ##2)instance配置目录下的instance.properties变更 ##不包含:canal.properties,spring/*.xml的配置变更 ##如果环境隔离、测试充分的环境下,或者应用试用初期,可以开启 ##对于高风险项目,建议关闭。canal.auto.scan = true canal.auto.scan.interval = 5   ##instance管理模式,Production级别我们要求使用spring canal.instance.global.mode = spring ##直接初始化和启动instance canal.instance.global.lazy = false ##Production级别,HA模式下,基于default-instance.xml ##需要即备的ZK集群,且不应该修改此文件的默认配置。 ##如果有自定义的场景,应该新建${instance}-instance.xml文件 canal.instance.global.spring.xml = classpath:spring/default-instance.xml    ##canal server的唯一标识,没有实际意义,但是我们建议同一个cluster上的不同节点,其ID尽可能唯一(后续升级) ##数字类型 canal.id = 1 ##canal server因为binding的本地IP地址,建议使用内网(唯一,集群可见,consumer可见)IP地址,比如“10.0.1.21”。 #此IP主要为canalServer提供TCP服务而使用,将会被注册到ZK中,Consumer将与此IP建立连接。canal.ip = ##conal server的TCP端口 canal.port = 11111 ##Production场景,HA模式下,比如使用ZK作为服务管理,此处至少指定“多数派ZK Node”的IP列表 ##如果你的多个Canal Cluster共享ZK,那么每个Canal还需要使用唯一的“rootpath”。canal.zkServers = 10.0.1.21:2818,10.0.1.22,10.0.2.21:2818/canal/g1 # flush data to zk ##适用于metaManager,基于period模式 ##metaManager优先将数据(position)保存在内存,然后定时、间歇性的将数据同步到ZK中。##此参数用于控制同步的时间间隔,建议为“1000”(1S),单位:ms。##运维或者架构师,应该观察ZK的效能,如果TPS过于频繁,可以提高此值、或者按Canal集群分离ZK集群。##目前架构下,Consumer向CanalServer提交ACK时会导致ZK数据的同步。canal.zookeeper.flush.period = 1000 ##canal将parse、position数据写入的本地文件目录,HA环境下无效。##(file-instance.xml) canal.file.data.dir = ${canal.conf.dir} canal.file.flush.period = 1000 ##内存模式,EventStore为Memory类型时。(default-instance.xml) ##可选值:##1) MEMSIZE 根据buffer.size * buffer.memunit的大小,限制缓存记录的大小,简答来说,就是内存容量大小限制 ##2) ITEMSIZE 根据buffer.size进行限制,简单来说,就是根据event的条数限制。##如果Canal上的instances个数有限,且Consumer的消费效率很高,甚至接近或者高于binlog解析效率,那么可以适度增加memory有关的数值。##此外batchMode还与消费者的batchSize有些关系,消费者每次能消费的数据量,取决于此mode。##如果mode为itemSize,则consumer每次获取的消息的条数为batchSize条。##如果mode为memSize,那么consumer消费的数据总量为batchSize * memunit canal.instance.memory.batch.mode = MEMSIZE canal.instance.memory.buffer.size = 16384 canal.instance.memory.buffer.memunit = 1024   # 所能支撑的事务的最大长度,超过阈值之后,一个事务的消息将会被拆分,并多次提交到eventStore中,但是将无法保证事务的完整性 canal.instance.transaction.size = 1024 # 当instance.properties配置文件中指定“master”、“standby”时,当canal与“master”联通性故障时,触发连接源的切换, ##那么切换时,在新的mysql库上查找binlog时需要往前“回退”查找的时间,单位:秒。##良好架构下,我们建议不使用“standby”,限定一个数据库源。因为多个源时,数据库的调整频繁、协调不足,可能会引入一些数据问题。canal.instance.fallbackIntervalInSeconds = 60   ## 有关HA心跳检测部分,主要用在Parser管理dump连接时使用。## 我们在HA环境时建议开启。canal.instance.detecting.enable = true #如果你需要限定某个database的可用性验证(比如库锁), #最好使用复杂的、有效的SQL,比如:insert into {database}.{tmpTable} .... canal.instance.detecting.sql = select 1 ##心跳检测频率,单位秒 canal.instance.detecting.interval.time = 6 ##重试次数 ##非常注意:interval.time * retry.threshold值,应该参考既往DBA同学对数据库的故障恢复时间, ##“太短”会导致集群运行态角色“多跳”;“太长”失去了活性检测的意义,导致集群的敏感度降低,Consumer断路可能性增加。canal.instance.detecting.retry.threshold = 5 #如果在instance.properties配置了“master”、“standby”,且此参数开启时,在“探测失败”后,会选择备库进行binlog获取 #建议关闭 canal.instance.detecting.heartbeatHaEnable = false   # CanalServer、instance有关的TCP网络配置,建议保持抱人 canal.instance.network.receiveBufferSize = 16384 canal.instance.network.sendBufferSize = 16384 canal.instance.network.soTimeout = 30  # Parser组件,有关binlog解析的过滤 ##是否过滤dcl语句,比如“grant/create user”等 canal.instance.filter.query.dcl = false ##dml语句:insert/update/delete等 canal.instance.filter.query.dml = false ##ddl语句:create table/alter table/drop table以及一些index变更 canal.instance.filter.query.ddl = false canal.instance.filter.table.error = false canal.instance.filter.rows = false   # binlog格式和“镜像”格式检测,建议保持默认 canal.instance.binlog.format = ROW,STATEMENT,MIXED canal.instance.binlog.image = FULL,MINIMAL,NOBLOB   # ddl是否隔离发送,保持默认 canal.instance.get.ddl.isolation = false


1.instance列表定义 (列出当前server上有多少个instance,每个instance的加载方式)


参数名字

参数说明

默认值

canal.destinations

当前server上部署的instance列表

canal.conf.dir

conf/目录所在的路径

../conf

canal.auto.scan

开启instance自动扫描

如果配置为true,canal.conf.dir目录下的instance配置变化会自动触发:

a. instance目录新增:触发instance配置载入,lazy为true时则自动启动

b. instance目录删除:卸载对应instance配置,如已启动则进行关闭

c. instance.properties文件变化:reload instance配置,如已启动自动进行重启操作

true

canal.auto.scan.interval

instance自动扫描的间隔时间,单位秒

5

canal.instance.global.mode

全局配置加载方式

spring

canal.instance.global.lazy

全局lazy模式

false

canal.instance.global.manager.address

全局的manager配置方式的链接信息

canal.instance.global.spring.xml

全局的spring配置方式的组件文件

classpath:spring/file-instance.xml 

(spring目录相对于canal.conf.dir)

canal.instance.example.mode

canal.instance.example.lazy

canal.instance.example.spring.xml

…..

instance级别的配置定义,如有配置,会自动覆盖全局配置定义模式

命名规则:canal.instance.{name}.xxx

canal.instance.tsdb.spring.xml

v1.0.25版本新增,全局的tsdb配置方式的组件文件

classpath:spring/tsdb/h2-tsdb.xml 

(spring目录相对于canal.conf.dir)

2.common参数定义,比如可以将instance.properties的公用参数,抽取放置到这里,这样每个instance启动的时候就可以共享.

参数名字

参数说明

默认值

canal.id

每个canal server实例的唯一标识,暂无实际意义

1

canal.ip

canal server绑定的本地IP信息,如果不配置,默认选择一个本机IP进行启动服务

canal.port

canal server提供socket服务的端口

11111

canal.zkServers

canal server链接zookeeper集群的链接信息

例子:127.0.0.1:2181,127.0.0.1:2182

canal.zookeeper.flush.period

canal持久化数据到zookeeper上的更新频率,单位毫秒

1000

canal.file.data.dir

canal持久化数据到file上的目录

../conf (默认和instance.properties为同一目录,方便运维和备份)

canal.file.flush.period

canal持久化数据到file上的更新频率,单位毫秒

1000

canal.instance.memory.batch.mode

canal内存store中数据缓存模式

1. ITEMSIZE : 根据buffer.size进行限制,只限制记录的数量

2. MEMSIZE : 根据buffer.size * buffer.memunit的大小,限制缓存记录的大小

MEMSIZE

canal.instance.memory.buffer.size

canal内存store中可缓存buffer记录数,需要为2的指数

16384

canal.instance.memory.buffer.memunit

内存记录的单位大小,默认1KB,和buffer.size组合决定最终的内存使用大小

1024

canal.instance.transactionn.size

最大事务完整解析的长度支持

超过该长度后,一个事务可能会被拆分成多次提交到canal store中,无法保证事务的完整可见性

1024

canal.instance.fallbackIntervalInSeconds

canal发生mysql切换时,在新的mysql库上查找binlog时需要往前查找的时间,单位秒

说明:mysql主备库可能存在解析延迟或者时钟不统一,需要回退一段时间,保证数据不丢

60

canal.instance.detecting.enable

是否开启心跳检查

false

canal.instance.detecting.sql

心跳检查sql

insert into retl.xdual values(1,now()) on duplicate key update x=now()

canal.instance.detecting.interval.time

心跳检查频率,单位秒

3

canal.instance.detecting.retry.threshold

心跳检查失败重试次数

3

canal.instance.detecting.heartbeatHaEnable

心跳检查失败后,是否开启自动mysql自动切换

说明:比如心跳检查失败超过阀值后,如果该配置为true,canal就会自动链到mysql备库获取binlog数据

false

canal.instance.network.receiveBufferSize

网络链接参数,SocketOptions.SO_RCVBUF

16384

canal.instance.network.sendBufferSize

网络链接参数,SocketOptions.SO_SNDBUF

16384

canal.instance.network.soTimeout

网络链接参数,SocketOptions.SO_TIMEOUT

30

canal.instance.filter.query.dcl

是否忽略DCL的query语句,比如grant/create user等

false

canal.instance.filter.query.dml

是否忽略DML的query语句,比如insert/update/delete table.(mysql5.6的ROW模式可以包含statement模式的query记录)

false

canal.instance.filter.query.ddl

是否忽略DDL的query语句,比如create table/alater table/drop table/rename table/create index/drop index. (目前支持的ddl类型主要为table级别的操作,create databases/trigger/procedure暂时划分为dcl类型)

false

canal.instance.filter.druid.ddl

v1.0.25版本新增,是否启用druid的DDL parse的过滤,基于sql的完整parser可以解决之前基于正则匹配补全的问题,默认为true

true

canal.instance.get.ddl.isolation

ddl语句是否隔离发送,开启隔离可保证每次只返回发送一条ddl数据,不和其他dml语句混合返回.(otter ddl同步使用)

false

conf/example/instance.properties配置文件:

在canal.properties定义了canal.destinations后,需要在canal.conf.dir对应的目录下建立同名的文件。

数据同步中间件服务

如果canal.properties未定义instance列表,但开启了canal.auto.scan时

  • server第一次启动时,会自动扫描conf目录下,将文件名做为instance name,启动对应的instance

  • server运行过程中,会根据canal.auto.scan.interval定义的频率,进行扫描

1. 发现目录有新增,启动新的instance

2. 发现目录有删除,关闭老的instance

3. 发现对应目录的instance.properties有变化,重启instance


instance.properties参数列表:

## 每个instance都会伪装成一个mysql slave, ## 考虑到binlog同步的机制,我们需要指定slaveId,注意此ID对于此canal前端的MySQL实例而言,必须是唯一的。## 同一个Canal cluster中相同instance,此slaveId应该一样。## 我们约定,所有Canal的instance,其slaveId以“1111”开头,后面补充四位数字。canal.instance.mysql.slaveId = 11110001  # 数据库相关:master库 ##备注,master并不是要求是“MySQL 数据库Master”, ## 而是Canal instance集群模式下,HA运行态中“master”(首选节点) ## 当在故障恢复、Canal迁移时,我们需要手动指定binlog名称以及postition或者timestamp,确保新Canal不会丢失数据。## 数据库实例地址,ip:port canal.instance.master.address = 127.0.0.1:3306 ##指定起始的binlog文件名,保持默认 canal.instance.master.journal.name = ##此binlog文件的position位置(offset),数字类型。获取此position之后的数据。canal.instance.master.position = ##此binlog的起始时间戳,获取此timestamp之后的数据。canal.instance.master.timestamp =  ##standby库 ##考虑到我司现状,暂不使用standby #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp =  # 数据库连接的用户名和密码 # 貌似Consumer与CanalServer建立连接时也用的是此用户名和密码 canal.instance.dbUsername = canal canal.instance.dbPassword = canal # 默认数据库 canal.instance.defaultDatabaseName = canal.instance.connectionCharset = UTF-8  # schema过滤规则,类似于MySQL binlog的filter # canal将会过滤那些不符合要求的table,这些table的数据将不会被解析和传送 # filter格式,Consumer端可以指定,只不过是后置的。## 无论是CanalServer还是Consumer,只要有一方指定了filter都会生效,consumer端如果指定,则会覆盖CanalServer端。canal.instance.filter.regex = .*\..* # table black regex canal.instance.filter.black.regex =


参数名字

参数说明

默认值

canal.instance.mysql.slaveId

mysql集群配置中的serverId概念,需要保证和当前mysql集群中id唯一

1234

canal.instance.master.address

127.0.0.1:3306

canal.instance.master.journal.name

mysql主库链接时起始的binlog文件

canal.instance.master.position

mysql主库链接时起始的binlog偏移量

canal.instance.master.timestamp

mysql主库链接时起始的binlog的时间戳

canal.instance.dbUsername

mysql数据库帐号

canal

canal.instance.dbPassword

mysql数据库密码

canal

canal.instance.defaultDatabaseName

mysql链接时默认schema


canal.instance.connectionCharset

mysql 数据解析编码

UTF-8

canal.instance.filter.regex

mysql 数据解析关注的表,Perl正则表达式.

多个正则之间以逗号(,)分隔,转义符需要双斜杠(\) 

常见例子:

1.  所有表:.*   or  .*\..*

2.  canal schema下所有表:canal\..*

3.  canal下的以canal打头的表:canal\.canal.*

4.  canal schema下的一张表:canal.test1

5.  多个规则组合使用:canal\..*,mysql.test1,mysql.test2 (逗号分隔)

注意:此过滤条件只针对row模式的数据有效(ps. mixed/statement因为不解析sql,所以无法准确提取tableName进行过滤)

.*\..*

canal.instance.tsdb.enable

v1.0.25版本新增,是否开启table meta的时间序列版本记录功能

true

canal.instance.tsdb.dir

v1.0.25版本新增,table meta的时间序列版本的本地存储路径,默认为instance目录

{canal.instance.destination:}

canal.instance.tsdb.url

v1.0.25版本新增,table meta的时间序列版本存储的数据库链接串,比如例子为本地嵌入式数据库

jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;

canal.instance.tsdb.dbUsername

v1.0.25版本新增,table meta的时间序列版本存储的数据库链接账号

canal

canal.instance.tsdb.dbUsername

v1.0.25版本新增,table meta的时间序列版本存储的数据库链接密码

canal

说明:

mysql链接时的起始位置

  • canalinstance.master.journal.name + canal.instance.master.position : 精确指定一个binlog位点,进行启动

  • canal.instance.master.timestamp : 指定一个时间戳,canal会自动遍历mysql binlog,找到对应时间戳的binlog位点后,进行启动

  • 不指定任何信息:默认从当前数据库的位点,进行启动。(show master status)



测试简单配置:

#position info,需要改成自己的数据库信息canal.instance.master.address = 127.0.0.1:3306#username/password,需要改成自己的数据库信息canal.instance.dbUsername = canalcanal.instance.dbPassword = canal


适配器配置:

dataSourceKey: defaultDSdestination: examplegroupId: g1outerAdapterKey: mysconcurrent: truedbMapping: #源数据库 database: source #源表 table: source_test #目标表 targetTable: target_test targetPk: #源表和目标表的主键对应关系 id: id# mapAll: true targetColumns: name: name

目标:

数据同步中间件服务

对源表进行增删改操作同步到目标表

启动canal

数据同步中间件服务


启动rdb适配器=》同步mysql

数据同步中间件服务

canal-admin进行配置和实例管理

数据同步中间件服务

对源表插入数据

数据同步中间件服务

适配器日志:

2021-02-23 13:24:53.987 [pool-1-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"id":3,"name":"user3"},"database":"source","destination":"example","old":null,"table":"source_test","type":"INSERT"}

目标表:


项目中订阅:


  初步看,如果要实现异地双活,实时同步这样的需求,这两款组件貌似都没满足要求,但也不能完全无视其其他用途,比如canal配置zk和使用kafka,可以实现一对多订阅,实时同步mysql数据,刷redis缓存,elasticsearch搜索库等常用组件,效果上还是挺实用的。

    下一步整理下源码上写的有哪些可取的亮点特色。

以上是关于数据同步中间件服务的主要内容,如果未能解决你的问题,请参考以下文章

随行付数据同步中间件「Porter」开源啦

Express实战 - 应用案例- realworld-API - 路由设计 - mongoose - 数据验证 - 密码加密 - 登录接口 - 身份认证 - token - 增删改查API(代码片段

步步为营99-不同数据库数据实时同步

使用Goldengate同步异构数据库之Kafka中间件

学习ASP.NET Core,怎能不了解请求处理管道[2]: 服务器在管道中的“龙头”地位

中间件常见面试题