DolphinScheduler 调度 DataX 实现 MySQL To MySQL 增量数据同步实战

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了DolphinScheduler 调度 DataX 实现 MySQL To MySQL 增量数据同步实战相关的知识,希望对你有一定的参考价值。

DolphinScheduler

背景

mysql库A 到 MySQL库B的增量数据同步需求

DolphinScheduler中配置DataX MySQL To MySQL工作流

工作流定义

工作流定义 > 创建工作流 > 拖入1个SHELL组件 > 拖入1个DATAX组件
SHELL组件(文章)
脚本


echo 文章同步 MySQL To MySQL

DATAX组件(t_article)
用到2个插件mysqlreader[1]、mysqlwriter[2]
选 自定义模板:



"job":
"content": [

"reader":
"name": "mysqlreader",
"parameter":
"connection": [

"jdbcUrl": [
"jdbc:mysql://$biz_mysql_host:$biz_mysql_port/你的数据库A?useUnicode=true&zeroDateTimeBehavior=convertToNull&characterEncoding=UTF8&autoReconnect=true&useSSL=false&&allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false"
],
"querySql": [
"select a.id,a.title,a.content,a.is_delete,a.delete_date,a.create_date,a.update_date from t_article a.update_date >= $biz_update_dt;"
]

],
"password": "$biz_mysql_password",
"username": "$biz_mysql_username"

,
"writer":
"name": "mysqlwriter",
"parameter":
"column": [
"`id`",
"`title`",
"`content`",
"`is_delete`",
"`delete_date`",
"`create_date`",
"`update_date`"
],
"connection": [

"jdbcUrl": "jdbc:mysql://$biz_mysql_host:$biz_mysql_port/你的数据库B?useUnicode=true&zeroDateTimeBehavior=convertToNull&characterEncoding=UTF8&autoReconnect=true&useSSL=false&&allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false",
"table": [
"t_article"
]

],
"writeMode": "replace",
"password": "$biz_mysql_password",
"username": "$biz_mysql_username"



],
"setting":
"errorLimit":
"percentage": 0,
"record": 0
,
"speed":
"channel": 1,
"record": 1000



reader和writer的字段配置需保持一致

自定义参数:


biz_update_dt: $global_bizdate
biz_mysql_host: 你的mysql ip
biz_mysql_port: 3306
biz_mysql_username: 你的mysql账号
biz_mysql_password: 你的mysql密码

# 本文实验环境A库和B库用的同一个实例,如果MySQL是多个实例,可以再新增加参数定义例如 biz_mysql_host_b,在模板中对应引用即可

配置的自定义参数将会自动替换json模板中的同名变量


DolphinScheduler

reader mysqlreader插件中关键配置: ​​a.update_date >= $biz_update_dt​​ 就是实现增量同步的关键配置
writer mysqlwriter插件中关键配置: ``


"parameter": 
"writeMode": "replace",
......

writeMode为replace,相同主键id重复写入数据,就会更新数据。sql本质上执行的是 ​​replace into​

保存工作流

全局变量设置
​global_bizdate: $[yyyy-MM-dd 00:00:00-1]​

global_bizdate 引用的变量为 DolphinScheduler 内置变量,具体参考官网文档^[3]
结合调度时间设计好时间滚动的窗口时长,比如按1天增量,那么这里时间就是减1天

DolphinScheduler

最终的工作流DAG图为:

DolphinScheduler

爬坑记录

  • 官网下载的DataX不包含ElasticSearchWriter写插件
    默认不带该插件,需要自己编译ElasticSearchWriter插件。


git clone https://github.com/alibaba/DataX.git

为了加快编译速度,可以只编译​​<module>elasticsearchwriter</module>​​ 项目根目录的pom.xml
全注释掉,下只保留<module>elasticsearchwriter</module>其他注释掉。

参考

​1. DataX MysqlReader 插件文档 https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md​​​​2. DataX MysqlWriter 插件文档 https://github.com/alibaba/DataX/blob/master/mysqlwriter/doc/mysqlwriter.md​​​​3. Apache DolphinScheduler 内置参数 https://dolphinscheduler.apache.org/zh-cn/docs/latest/user_doc/guide/parameter/built-in.html​


以上是关于DolphinScheduler 调度 DataX 实现 MySQL To MySQL 增量数据同步实战的主要内容,如果未能解决你的问题,请参考以下文章

使用DolphinScheduler调度Python任务

使用DolphinScheduler调度Python任务

深入了解海豚调度DolphinScheduler

大数据任务调度工具 Apache DolphinScheduler

大数据任务调度工具 Apache DolphinScheduler

大数据任务调度工具 Apache DolphinScheduler