数据同步中间件服务
Posted 土豆仙
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了数据同步中间件服务相关的知识,希望对你有一定的参考价值。
数据同步可用性还是很高的,这两天有时间便来总结下开源数据同步方案。
个人学习流程:
适合查阅人员:后端开发,测试人员,数据分析工程师。
概念介绍:
ETL(数据仓库技术):英文Extract-Transform-Load的缩写,用来描述将数据从来源端经过抽取(extract)、转换(transform)、加载(load)至目的端的过程。
数据同步划分
全量同步:
分页查询源端的表,然后通过 jdbc的batch 方式插入到目标表
基于数据文件导出和导入的全量同步,这种同步方式一般只适用于同种数据库之间的同步
增量同步:
增量同步一般是做实时的同步
基于触发器的增量同步
开源中间件实现,基于数据库日志实现(例如mysql的binlog)
开源工具
datax
datax 是阿里开源的etl 工具,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步功能,采用java+python进行开发,核心是java语言实现。数据交换通过DataX进行中转,任何数据源只要和DataX连接上即可以和已实现的任意数据源同步。
databus
Databus是一个实时的、可靠的、支持事务的、保持一致性的数据变更抓取系统。2011年在LinkedIn正式进入生产系统,2013年开源。
gobblin
Gobblin是用来整合各种数据源的通用型ETL框架,在某种意义上,各种数据都可以在这里“一站式”的解决ETL整个过程,专为大数据采集而生,易于操作和监控,提供流式抽取支持。主要用于Kafka的数据同步到HDFS。
MongoShake
MongoShake是阿里巴巴Nosql团队开源出来的一个项目,主要用于mongdb的数据同步到kafka或者其他的mongdb数据库中。集群数据同步是其中核心应用场景,通过抓取oplog后进行回放达到同步目的,实现灾备和多活的业务场景。
Flinkx
FlinkX是一个基于Flink的批流统一的数据同步工具,既可以采集静态的数据,比如MySQL,HDFS等,也可以采集实时变化的数据,比如MySQL binlog,Kafka等。
canal
阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。不过早期的数据库同步业务,主要是基于trigger的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务。
其他
使用spark任务,通过HQl读取数据,然后再通过hbase的Api插入到hbase中。
使用BulkLoad可以快速导入,BulkLoad主要是借用了hbase的存储设计思想,因为hbase本质是存储在hdfs上的一个文件夹,然后底层是以一个个的Hfile存在的。如果不是hive中的数据,比如外部的数据,那么我们可以将外部的数据生成文件,然后上传到hdfs中,组装RowKey,然后将封装后的数据在回写到HDFS上,以HFile的形式存储到HDFS指定的目录中。
基于sqoop的全量导入
比较总结:
datax一般比较适合于全量数据同步,对全量数据同步效率很高(任务可以拆分,并发同步,所以效率高),对于增量数据同步支持的不太好(可以依靠时间戳+定时调度来实现,但是不能做到实时,延迟较大)。
canal 、databus 等由于是通过日志抓取的方式进行同步,所以对增量同步支持的比较好。
databus,flinkx活跃度也不是非常高,关注的人还不是很多,MongoShake专为mongdb服务不作考虑。
比较分析,同步服务选用datax,canal使用人多,国人开发,文档源码易读。
本文主要介绍datax,canal两款工具。
1.全量离线同步datax
DataX本身作为数据同步框架,将不同数据源的同步抽象为从源头数据源读取数据的Reader插件(插件化是值得总结的技术技巧),以及向目标端写入数据的Writer插件,理论上DataX框架可以支持任意数据源类型的数据同步工作。
流程:
架构:
核心模块介绍:
DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。
DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。
切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。
每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。
DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0
DataX调度流程:
举例来说,用户提交了一个DataX作业,并且配置了20个并发,目的是将一个100张分表的mysql数据同步到odps里面。DataX的调度决策思路是:
DataXJob根据分库分表切分成了100个Task。
根据20个并发,DataX计算共需要分配4个TaskGroup。
4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责以5个并发共计运行25个Task。
优势:
每种插件都有自己的数据转换策略;
提供作业全链路的流量以及数据量运行时监控,包括作业本身状态、数据流量、数据速度、执行进度等。
由于各种原因导致传输报错的脏数据,DataX可以实现精确的过滤、识别、采集、展示,为用户提过多种脏数据处理模式;
精确的速度控制
健壮的容错机制,包括线程内部重试、线程级别重试;
插件视角看框架
Job:是DataX用来描述从一个源头到目的的同步作业,是DataX数据同步的最小业务单元;
Task:为最大化而把Job拆分得到最小的执行单元,进行并发执行;
TaskGroup:一组Task集合,在同一个TaskGroupContainer执行下的Task集合称为TaskGroup;
JobContainer:Job执行器,负责Job全局拆分、调度、前置语句和后置语句等工作的工作单元。类似Yarn中的JobTracker;
TaskGroupContainer:TaskGroup执行器,负责执行一组Task的工作单元,类似Yarn中的TAskTacker。
即,Job拆分为Task,分别在框架提供的容器中执行,插件只需要实现Job和Task两部分逻辑。
物理执行有三种运行模式:
Standalone:单进程运行,没有外部依赖;
Local:单进程运行,统计信息,错误信息汇报到集中存储;
Distrubuted:分布式多线程运行,依赖DataX Service服务;
总体来说,当JobContainer和TaskGroupContainer运行在同一个进程内的时候就是单机模式,在不同进程执行就是分布式模式。
类型 |
数据源 |
Reader(读) |
Writer(写) |
文档 |
RDBMS 关系型数据库 |
MySQL |
√ |
√ |
读 、写 |
Oracle |
√ |
√ |
读 、写 |
|
SQLServer |
√ |
√ |
读 、写 |
|
PostgreSQL |
√ |
√ |
读 、写 |
|
DRDS |
√ |
√ |
读 、写 |
|
通用RDBMS(支持所有关系型数据库) |
√ |
√ |
读 、写 |
|
阿里云数仓数据存储 |
ODPS |
√ |
√ |
读 、写 |
ADS |
√ |
写 |
||
OSS |
√ |
√ |
读 、写 |
|
OCS |
√ |
√ |
读 、写 |
|
NoSQL数据存储 |
OTS |
√ |
√ |
读 、写 |
Hbase0.94 |
√ |
√ |
读 、写 |
|
Hbase1.1 |
√ |
√ |
读 、写 |
|
Phoenix4.x |
√ |
√ |
读 、写 |
|
Phoenix5.x |
√ |
√ |
读 、写 |
|
MongoDB |
√ |
√ |
读 、写 |
|
Hive |
√ |
√ |
读 、写 |
|
无结构化数据存储 |
TxtFile |
√ |
√ |
读 、写 |
FTP |
√ |
√ |
读 、写 |
|
HDFS |
√ |
√ |
读 、写 |
|
Elasticsearch |
√ |
写 |
||
时间序列数据库 |
OpenTSDB |
√ |
读 |
|
TSDB |
√ |
写 |
插件开发指南:https://github.com/alibaba/DataX/blob/master/dataxPluginDev.md
使用:
环境要求:
Java 8(jdk版本建议1.8.201以上)
Python2.7(Pythonn3和Python2同时安装的可以修改执行文件名)
Database: Mysql5.7
(1)、下载DataX源码:
git clone git@github.com:alibaba/DataX.git
(2)、通过maven打包:
mvn -U clean package assembly:assembly -Dmaven.test.skip=true
结构如下:
bin conf job lib log log_perf plugin script tmp
注意连接数据库的jar(可替换成自己对应的数据库版本)
可视化界面:
(1)、下载datax-web源码
git clone https://github.com/WeiYe-Jing/datax-web.git
(2)、创建数据库
执行bin/db下面的datax_web.sql文件
(3)、修改配置
1.修改datax_admin下resources/application.yml文件
2.修改datax_executor下resources/application.yml文件
启动项目:
1.本地idea开发环境
运行datax_admin下 DataXAdminApplication
运行datax_executor下 DataXExecutorApplication
启动成功后打开页面(默认管理员用户名:admin 密码:123456)
http://localhost:8080/index.html#/dashboard
2.配置执行
创建项目
创建数据源库与目标库
创建定时同步任务模板
创建同步规则Json
执行同步任务
参数配置说明:
同步规则Json
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [//需要同步的字段
"district_code",
"district_name"
],
"connection": [//源数据库连接
{
"jdbcUrl": [
"jdbc:mysql://localhost:3306/cci?characterEncoding=utf8&allowPublicKeyRetrieval=true&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=UTC"
],
"table": [//源表
"std_com_code_district"
]
}
],
"password": "123456",
"username": "root",
"where": ""
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"column": [//目标字段
"district_code",
"district_name"
],
"connection": [//目标数据库连接
{
"jdbcUrl": "jdbc:mysql://127.0.0.1:3306/cci_canal?useUnicode=true&characterEncoding=utf8&useSSL=false",
"table": [
"syn_table_test"
]
}
],
"password": "123456",
"preSql": [],
"session": [],
"username": "root",
"writeMode": "insert" //写入模式
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
效果:
每天定时同步数据
源表
目标表:
总结:
Datax适合离线全量同步,对于增量同步也有伪方案:
MysqlReader使用JDBC SELECT语句完成数据抽取工作,因此可以使用SELECT…WHERE…进行增量数据抽取,方式有多种:
数据库在线应用写入数据库时,填充modify字段为更改时间戳,包括新增、更新、删除(逻辑删)。对于这类应用,MysqlReader只需要WHERE条件跟上一同步阶段时间戳即可。
对于新增流水型数据,MysqlReader可以WHERE条件后跟上一阶段最大自增ID即可。
对于业务上无字段区分新增、修改数据情况,MysqlReader也无法进行增量数据同步,只能同步全量数据。
以上是datax数据同步,以后数据同步就不需要自己写定时任务了,简单的配一配,复杂的在这基础上再开发。完美!
2.增量实时同步canal
开源的组件,已经实现模拟成一个mysql的slave,拉取binlog的服务:
阿里巴巴开源的canal
美团开源的puma
linkedin开源的databus
增量同步服务要考虑的问题:
如何解决重复插入
如何解决唯一索引冲突
对于DDL语句如何处理
如何解决数据回环问题
canal/otter基于数据库的日志解析,获取增量变更进行同步,可以实现增量订阅&消费的业务。canal的最新版本已经实现了GTID,数据重复插入,数据回环。可以进行DDL语句过滤。
应用场景:
数据库镜像
数据库实时备份
多级索引 (卖家和买家各自分库索引)
search build
业务cache刷新
价格变化等重要业务消息
原理:
canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
mysql master收到dump请求,开始推送binary log给slave(也就是canal)
canal解析binary log对象(原始为byte流)
Canal内部组件解析
-
Canal 节点,可以有多个instances,每个instance在运行时为一个单独的Spring Context,对象实例为“CanalInstanceWithSpring”。 -
每个instances有一个单独的线程处理整个数据流过程。 instance 内部有EventParser、EventSink、EventStore、metaManager主要四个组件构成,当然还有其他的守护组件比如monitor、HA心跳检测、ZK事件监听等。对象实例初始化和依赖关系,可以参见“default-instance.xml”,其配置模式为普通的Spring。(源码参见:SpringCanalInstanceGenerator)
Canal server的HA机制: canal的ha分为两部分,canal server和canal client分别有对应的ha实现canal server: 为了减少对mysql dump的请求,不同server上的instance要求同一时间只能有一个处于running,其他的处于standby状态.
canal client: 为了保证有序性,一份instance同一时间只能由一个canal client进行get/ack/rollback操作,否则客户端接收无法保证有序。整个HA机制的控制主要是依赖了zookeeper的几个特性,watcher和EPHEMERAL节点(和session生命周期绑定)
-
Parser主要用于解析指定"数据库"的binlog,内部基于JAVA实现的“binlog dump”、“show master status”等。Parser会与ZK交互,并获取当前instance所有消费者的cursor,并获其最小值,作为此instance解析binlog的起始position。目前的实现,一个instance同时只能有一个consumer处于active消费状态,ClientId为定值“1001”,“cursor”中包含consumer消费binlog的position,数字类型。有次可见,Canal instance本身并没有保存binlog的position,Parser中继操作是根据consumer的消费cursor位置来决定;对于信息缺失时,比如Canal集群初次online,且在“default-instance.xml”中也没有指定“masterPositiion”信息(每个instance.properties是可以指定起始position的),那么将根据“show master status”指令获取当前binlog的最后位置。 (源码:MysqlEventParser.findStartPosition()) mysql的Binlay log网络协议:
-
Parser每次、批量获取一定条数的binlog,将binlog数据封装成event,并经由EventSink将消息转发给EventStore,Sink的作用就是“协调Parser和Store”,确保binglog的解析速率与Store队列容量相容。
-
EventStore,用于暂存“尚未消费”的events的存储队列,默认基于内存的阻塞队列实现。Store中的数据由Sink组件提交入队,有NettyServer服务的消费者消费确认后出队,队列的容量和容量模式由“canal.properties”中的“memory”相关配置决定。当Store中容量溢满时,将会阻塞Sink操作(间接阻塞Parser),所以消费者的效能会直接影响instance的同步效率。借鉴了Disruptor的RingBuffer的实现思路。
-
metaManager :主要用于保存Parser组件、CanalServer(即本文中提到的NettyServer)、Canal Instances的meta数据,其中Parser组件涉及到的binlog position、CanalServer与消费者交互时ACK的Cursor信息、instance的集群运行时信息等。根据官方解释,我们在production级别、高可靠业务要求场景下,metaManager建议基于Zookeeper实现。 其中有关Position信息由CanalLogPositionManager类负责,其实现类有多个,在Cluster模式下,建议基于FailbackLogPositionManager,其内部有“primary”、“failback”两级组合,优先基于primary来存取Position,只有当primary异常时会“降级”使用failback;其配置模式,建议与“default-instance.xml”保持一致。
部署使用:
canal的作用就是类似于前面所述的binlog syncer,拉取解析binlog。otter是canal的客户端,专门用于进行数据同步,类似于前文所讲解的sql writer。
安装步骤:
canal+kafka+mysql+canal-admin联合部署
docker run --name localdb_mysql
-v D:docker_datamysqlmy.cnf:/etc/mysql/conf.d
-v D:docker_datamysqllogs:/logs
-v D:docker_datamysqldata:/var/lib/mysql
-e MYSQL_ROOT_PASSWORD=123456 -d -i -p 3306:3306 mysql:5.7.18
mysql数据库配置(docker安装可以使用echo写入)
cd /etc/mysql/mysql.conf.d
echo -e '[mysqld] pid-file = /var/run/mysqld/mysqld.pid socket = /var/run/mysqld/mysqld.sock datadir = /var/lib/mysql symbolic-links=0 server-id = 1 log-bin = binlog log-bin-index = binlog.index'>mysqld.cnf
#添加这一行就ok
log-bin=mysql-bin
#选择row模式
binlog-format=ROW
#配置mysql replaction需要定义,不能和canal的slaveId重复
server_id=1
创建同步账号
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
//单机测试暂时不用安装zk和kafka
安装zookeeper
docker run
--privileged=true -d
--name zookeeper
--publish 2181:2181 -d zookeeper:latest
安装zookeeper客户端 =》zkui 或者 prettyZoo
安装kafka
docker run -d --name kafka
--publish 9092:9092 --link zookeeper
--env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
--env KAFKA_ADVERTISED_HOST_NAME=localhost
--env KAFKA_ADVERTISED_PORT=9092 wurstmeister/kafka
安装kafka管理界面
docker run -d --name kafka-manager
--link zookeeper:zookeeper
--link kafka:kafka -p 9001:9000
--restart=always
--env ZK_HOSTS=zookeeper:2181
sheepkiller/kafka-manager
启动canal、canal-admin、canal-adapter
git clone https://github.com/alibaba/canal.git
测试kafka安装成功:
//创建topic
bin/kafka-topics.sh --create --zookeeper zookeeper :2181 --replication-factor 1 --partitions 1 --topic mykafka
//查看topic
bin/kafka-topics.sh --list --zookeeper zookeeper :2181
//创建生产者
bin/kafka-console-producer.sh --broker-list zookeeper :9092 --topic mykafka
//创建消费者
bin/kafka-console-consumer.sh --zookeeper zookeeper :2181 --topic mykafka --from-beginning
配置:
canal配置:
# 当前canal节点部署的instances列表,以“,”分割
#比如:test,example
canal.destinations= example
#canal配置文件主目录,保持默认即可。
#除非你为了提高canal的动态管理能力,将conf文件迁移到了其他目录(比如NFS目录等)
canal.conf.dir = ../conf
是否开启“instance”配置修改自动扫描和重载
#1)conf.dir目录下新增、删除instance配置目录
#2)instance配置目录下的instance.properties变更
#不包含:canal.properties,spring/*.xml的配置变更
#如果环境隔离、测试充分的环境下,或者应用试用初期,可以开启
#对于高风险项目,建议关闭。
canal.auto.scan = true
canal.auto.scan.interval = 5
#instance管理模式,Production级别我们要求使用spring
canal.instance.global.mode = spring
#直接初始化和启动instance
canal.instance.global.lazy = false
#Production级别,HA模式下,基于default-instance.xml
#需要即备的ZK集群,且不应该修改此文件的默认配置。
#如果有自定义的场景,应该新建${instance}-instance.xml文件
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
#canal server的唯一标识,没有实际意义,但是我们建议同一个cluster上的不同节点,其ID尽可能唯一(后续升级)
#数字类型
canal.id = 1
#canal server因为binding的本地IP地址,建议使用内网(唯一,集群可见,consumer可见)IP地址,比如“10.0.1.21”。
此IP主要为canalServer提供TCP服务而使用,将会被注册到ZK中,Consumer将与此IP建立连接。
canal.ip =
#conal server的TCP端口
canal.port = 11111
#Production场景,HA模式下,比如使用ZK作为服务管理,此处至少指定“多数派ZK Node”的IP列表
#如果你的多个Canal Cluster共享ZK,那么每个Canal还需要使用唯一的“rootpath”。
canal.zkServers = 10.0.1.21:2818,10.0.1.22,10.0.2.21:2818/canal/g1
flush data to zk
#适用于metaManager,基于period模式
#metaManager优先将数据(position)保存在内存,然后定时、间歇性的将数据同步到ZK中。
#此参数用于控制同步的时间间隔,建议为“1000”(1S),单位:ms。
#运维或者架构师,应该观察ZK的效能,如果TPS过于频繁,可以提高此值、或者按Canal集群分离ZK集群。
#目前架构下,Consumer向CanalServer提交ACK时会导致ZK数据的同步。
canal.zookeeper.flush.period = 1000
#canal将parse、position数据写入的本地文件目录,HA环境下无效。
#(file-instance.xml)
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
#内存模式,EventStore为Memory类型时。(default-instance.xml)
#可选值:
#1) MEMSIZE 根据buffer.size * buffer.memunit的大小,限制缓存记录的大小,简答来说,就是内存容量大小限制
#2) ITEMSIZE 根据buffer.size进行限制,简单来说,就是根据event的条数限制。
#如果Canal上的instances个数有限,且Consumer的消费效率很高,甚至接近或者高于binlog解析效率,那么可以适度增加memory有关的数值。
#此外batchMode还与消费者的batchSize有些关系,消费者每次能消费的数据量,取决于此mode。
#如果mode为itemSize,则consumer每次获取的消息的条数为batchSize条。
#如果mode为memSize,那么consumer消费的数据总量为batchSize * memunit
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.buffer.size = 16384
canal.instance.memory.buffer.memunit = 1024
所能支撑的事务的最大长度,超过阈值之后,一个事务的消息将会被拆分,并多次提交到eventStore中,但是将无法保证事务的完整性
canal.instance.transaction.size = 1024
当instance.properties配置文件中指定“master”、“standby”时,当canal与“master”联通性故障时,触发连接源的切换,
#那么切换时,在新的mysql库上查找binlog时需要往前“回退”查找的时间,单位:秒。
#良好架构下,我们建议不使用“standby”,限定一个数据库源。因为多个源时,数据库的调整频繁、协调不足,可能会引入一些数据问题。
canal.instance.fallbackIntervalInSeconds = 60
# 有关HA心跳检测部分,主要用在Parser管理dump连接时使用。
# 我们在HA环境时建议开启。
canal.instance.detecting.enable = true
如果你需要限定某个database的可用性验证(比如库锁),
最好使用复杂的、有效的SQL,比如:insert into {database}.{tmpTable} ....
canal.instance.detecting.sql = select 1
#心跳检测频率,单位秒
canal.instance.detecting.interval.time = 6
#重试次数
#非常注意:interval.time * retry.threshold值,应该参考既往DBA同学对数据库的故障恢复时间,
#“太短”会导致集群运行态角色“多跳”;“太长”失去了活性检测的意义,导致集群的敏感度降低,Consumer断路可能性增加。
canal.instance.detecting.retry.threshold = 5
如果在instance.properties配置了“master”、“standby”,且此参数开启时,在“探测失败”后,会选择备库进行binlog获取
建议关闭
canal.instance.detecting.heartbeatHaEnable = false
CanalServer、instance有关的TCP网络配置,建议保持抱人
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30
Parser组件,有关binlog解析的过滤
#是否过滤dcl语句,比如“grant/create user”等
canal.instance.filter.query.dcl = false
#dml语句:insert/update/delete等
canal.instance.filter.query.dml = false
#ddl语句:create table/alter table/drop table以及一些index变更
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
binlog格式和“镜像”格式检测,建议保持默认
canal.instance.binlog.format = ROW,STATEMENT,MIXED
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB
ddl是否隔离发送,保持默认
canal.instance.get.ddl.isolation = false
1.instance列表定义 (列出当前server上有多少个instance,每个instance的加载方式)
参数名字 |
参数说明 |
默认值 |
canal.destinations |
当前server上部署的instance列表 |
无 |
canal.conf.dir |
conf/目录所在的路径 |
../conf |
canal.auto.scan |
开启instance自动扫描 如果配置为true,canal.conf.dir目录下的instance配置变化会自动触发: a. instance目录新增:触发instance配置载入,lazy为true时则自动启动 b. instance目录删除:卸载对应instance配置,如已启动则进行关闭 c. instance.properties文件变化:reload instance配置,如已启动自动进行重启操作 |
true |
canal.auto.scan.interval |
instance自动扫描的间隔时间,单位秒 |
5 |
canal.instance.global.mode |
全局配置加载方式 |
spring |
canal.instance.global.lazy |
全局lazy模式 |
false |
canal.instance.global.manager.address |
全局的manager配置方式的链接信息 |
无 |
canal.instance.global.spring.xml |
全局的spring配置方式的组件文件 |
classpath:spring/file-instance.xml (spring目录相对于canal.conf.dir) |
canal.instance.example.mode canal.instance.example.lazy canal.instance.example.spring.xml ….. |
instance级别的配置定义,如有配置,会自动覆盖全局配置定义模式 命名规则:canal.instance.{name}.xxx |
无 |
canal.instance.tsdb.spring.xml |
v1.0.25版本新增,全局的tsdb配置方式的组件文件 |
classpath:spring/tsdb/h2-tsdb.xml (spring目录相对于canal.conf.dir) |
2.common参数定义,比如可以将instance.properties的公用参数,抽取放置到这里,这样每个instance启动的时候就可以共享.
参数名字 |
参数说明 |
默认值 |
canal.id |
每个canal server实例的唯一标识,暂无实际意义 |
1 |
canal.ip |
canal server绑定的本地IP信息,如果不配置,默认选择一个本机IP进行启动服务 |
无 |
canal.port |
canal server提供socket服务的端口 |
11111 |
canal.zkServers |
canal server链接zookeeper集群的链接信息 例子:127.0.0.1:2181,127.0.0.1:2182 |
无 |
canal.zookeeper.flush.period |
canal持久化数据到zookeeper上的更新频率,单位毫秒 |
1000 |
canal.file.data.dir |
canal持久化数据到file上的目录 |
../conf (默认和instance.properties为同一目录,方便运维和备份) |
canal.file.flush.period |
canal持久化数据到file上的更新频率,单位毫秒 |
1000 |
canal.instance.memory.batch.mode |
canal内存store中数据缓存模式 1. ITEMSIZE : 根据buffer.size进行限制,只限制记录的数量 2. MEMSIZE : 根据buffer.size * buffer.memunit的大小,限制缓存记录的大小 |
MEMSIZE |
canal.instance.memory.buffer.size |
canal内存store中可缓存buffer记录数,需要为2的指数 |
16384 |
canal.instance.memory.buffer.memunit |
内存记录的单位大小,默认1KB,和buffer.size组合决定最终的内存使用大小 |
1024 |
canal.instance.transactionn.size |
最大事务完整解析的长度支持 超过该长度后,一个事务可能会被拆分成多次提交到canal store中,无法保证事务的完整可见性 |
1024 |
canal.instance.fallbackIntervalInSeconds |
canal发生mysql切换时,在新的mysql库上查找binlog时需要往前查找的时间,单位秒 说明:mysql主备库可能存在解析延迟或者时钟不统一,需要回退一段时间,保证数据不丢 |
60 |
canal.instance.detecting.enable |
是否开启心跳检查 |
false |
canal.instance.detecting.sql |
心跳检查sql |
insert into retl.xdual values(1,now()) on duplicate key update x=now() |
canal.instance.detecting.interval.time |
心跳检查频率,单位秒 |
3 |
canal.instance.detecting.retry.threshold |
心跳检查失败重试次数 |
3 |
canal.instance.detecting.heartbeatHaEnable |
心跳检查失败后,是否开启自动mysql自动切换 说明:比如心跳检查失败超过阀值后,如果该配置为true,canal就会自动链到mysql备库获取binlog数据 |
false |
canal.instance.network.receiveBufferSize |
网络链接参数,SocketOptions.SO_RCVBUF |
16384 |
canal.instance.network.sendBufferSize |
网络链接参数,SocketOptions.SO_SNDBUF |
16384 |
canal.instance.network.soTimeout |
网络链接参数,SocketOptions.SO_TIMEOUT |
30 |
canal.instance.filter.query.dcl |
是否忽略DCL的query语句,比如grant/create user等 |
false |
canal.instance.filter.query.dml |
是否忽略DML的query语句,比如insert/update/delete table.(mysql5.6的ROW模式可以包含statement模式的query记录) |
false |
canal.instance.filter.query.ddl |
是否忽略DDL的query语句,比如create table/alater table/drop table/rename table/create index/drop index. (目前支持的ddl类型主要为table级别的操作,create databases/trigger/procedure暂时划分为dcl类型) |
false |
canal.instance.filter.druid.ddl |
v1.0.25版本新增,是否启用druid的DDL parse的过滤,基于sql的完整parser可以解决之前基于正则匹配补全的问题,默认为true |
true |
canal.instance.get.ddl.isolation |
ddl语句是否隔离发送,开启隔离可保证每次只返回发送一条ddl数据,不和其他dml语句混合返回.(otter ddl同步使用) |
false |
conf/example/instance.properties配置文件:
在canal.properties定义了canal.destinations后,需要在canal.conf.dir对应的目录下建立同名的文件。
如果canal.properties未定义instance列表,但开启了canal.auto.scan时
server第一次启动时,会自动扫描conf目录下,将文件名做为instance name,启动对应的instance
server运行过程中,会根据canal.auto.scan.interval定义的频率,进行扫描
1. 发现目录有新增,启动新的instance
2. 发现目录有删除,关闭老的instance
3. 发现对应目录的instance.properties有变化,重启instance
instance.properties参数列表:
# 每个instance都会伪装成一个mysql slave,
# 考虑到binlog同步的机制,我们需要指定slaveId,注意此ID对于此canal前端的MySQL实例而言,必须是唯一的。
# 同一个Canal cluster中相同instance,此slaveId应该一样。
# 我们约定,所有Canal的instance,其slaveId以“1111”开头,后面补充四位数字。
canal.instance.mysql.slaveId = 11110001
数据库相关:master库
#备注,master并不是要求是“MySQL 数据库Master”,
# 而是Canal instance集群模式下,HA运行态中“master”(首选节点)
# 当在故障恢复、Canal迁移时,我们需要手动指定binlog名称以及postition或者timestamp,确保新Canal不会丢失数据。
# 数据库实例地址,ip:port
canal.instance.master.address = 127.0.0.1:3306
#指定起始的binlog文件名,保持默认
canal.instance.master.journal.name =
#此binlog文件的position位置(offset),数字类型。获取此position之后的数据。
canal.instance.master.position =
#此binlog的起始时间戳,获取此timestamp之后的数据。
canal.instance.master.timestamp =
#standby库
#考虑到我司现状,暂不使用standby
canal.instance.standby.address =
canal.instance.standby.journal.name =
canal.instance.standby.position =
canal.instance.standby.timestamp =
数据库连接的用户名和密码
貌似Consumer与CanalServer建立连接时也用的是此用户名和密码
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
默认数据库
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
schema过滤规则,类似于MySQL binlog的filter
canal将会过滤那些不符合要求的table,这些table的数据将不会被解析和传送
filter格式,Consumer端可以指定,只不过是后置的。
# 无论是CanalServer还是Consumer,只要有一方指定了filter都会生效,consumer端如果指定,则会覆盖CanalServer端。
canal.instance.filter.regex = .*\..*
table black regex
canal.instance.filter.black.regex =
参数名字 |
参数说明 |
默认值 |
canal.instance.mysql.slaveId |
mysql集群配置中的serverId概念,需要保证和当前mysql集群中id唯一 |
1234 |
canal.instance.master.address |
127.0.0.1:3306 |
|
canal.instance.master.journal.name |
mysql主库链接时起始的binlog文件 |
无 |
canal.instance.master.position |
mysql主库链接时起始的binlog偏移量 |
无 |
canal.instance.master.timestamp |
mysql主库链接时起始的binlog的时间戳 |
无 |
canal.instance.dbUsername |
mysql数据库帐号 |
canal |
canal.instance.dbPassword |
mysql数据库密码 |
canal |
canal.instance.defaultDatabaseName |
mysql链接时默认schema |
|
canal.instance.connectionCharset |
mysql 数据解析编码 |
UTF-8 |
canal.instance.filter.regex |
mysql 数据解析关注的表,Perl正则表达式. 多个正则之间以逗号(,)分隔,转义符需要双斜杠(\) 常见例子: 1. 所有表:.* or .*\..* 2. canal schema下所有表:canal\..* 3. canal下的以canal打头的表:canal\.canal.* 4. canal schema下的一张表:canal.test1 5. 多个规则组合使用:canal\..*,mysql.test1,mysql.test2 (逗号分隔) 注意:此过滤条件只针对row模式的数据有效(ps. mixed/statement因为不解析sql,所以无法准确提取tableName进行过滤) |
.*\..* |
canal.instance.tsdb.enable |
v1.0.25版本新增,是否开启table meta的时间序列版本记录功能 |
true |
canal.instance.tsdb.dir |
v1.0.25版本新增,table meta的时间序列版本的本地存储路径,默认为instance目录 |
{canal.instance.destination:} |
canal.instance.tsdb.url |
v1.0.25版本新增,table meta的时间序列版本存储的数据库链接串,比如例子为本地嵌入式数据库 |
jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL; |
canal.instance.tsdb.dbUsername |
v1.0.25版本新增,table meta的时间序列版本存储的数据库链接账号 |
canal |
canal.instance.tsdb.dbUsername |
v1.0.25版本新增,table meta的时间序列版本存储的数据库链接密码 |
canal |
说明:
mysql链接时的起始位置
canalinstance.master.journal.name + canal.instance.master.position : 精确指定一个binlog位点,进行启动
canal.instance.master.timestamp : 指定一个时间戳,canal会自动遍历mysql binlog,找到对应时间戳的binlog位点后,进行启动
不指定任何信息:默认从当前数据库的位点,进行启动。(show master status)
测试简单配置:
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
适配器配置:
dataSourceKey: defaultDS
destination: example
groupId: g1
outerAdapterKey: mys
concurrent: true
dbMapping:
#源数据库
database: source
#源表
table: source_test
#目标表
targetTable: target_test
targetPk:
#源表和目标表的主键对应关系
id: id
# mapAll: true
targetColumns:
name: name
目标:
对源表进行增删改操作同步到目标表
启动canal
启动rdb适配器=》同步mysql
canal-admin进行配置和实例管理
对源表插入数据
适配器日志:
2021-02-23 13:24:53.987 [pool-1-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"id":3,"name":"user3"},"database":"source","destination":"example","old":null,"table":"source_test","type":"INSERT"}
目标表:
项目中订阅:
初步看,如果要实现异地双活,实时同步这样的需求,这两款组件貌似都没满足要求,但也不能完全无视其其他用途,比如canal配置zk和使用kafka,可以实现一对多订阅,实时同步mysql数据,刷redis缓存,elasticsearch搜索库等常用组件,效果上还是挺实用的。
下一步整理下源码上写的有哪些可取的亮点特色。
以上是关于数据同步中间件服务的主要内容,如果未能解决你的问题,请参考以下文章
Express实战 - 应用案例- realworld-API - 路由设计 - mongoose - 数据验证 - 密码加密 - 登录接口 - 身份认证 - token - 增删改查API(代码片段