DataX 增量同步数据(shell)

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了DataX 增量同步数据(shell)相关的知识,希望对你有一定的参考价值。

全量数据导出请查看​​DataX mongodb导出数据到mysql​

​Datax UDF手册​

datax.py mongodb2mysql_inc.json



"job":
"setting":
"speed":
"channel": 4

,
"content": [
"reader":
"name": "mongodbreader",
"parameter":
"address": ["*.*.*.*:27017"],
"userName": "DataXTest",
"userPassword": "123456",
"dbName": "weixin",
"collectionName": "fileids_wxpy",
"column": [
"index":0,
"name": "_id",
"type": "string"
,
"index":1,
"name": "crawler_time",
"type": "string"
,
"index":2,
"name": "file_url",
"type": "string"
,
"index":3,
"name": "flag",
"type": "string"
,
"index":4,
"name": "logo_url",
"type": "string"
,
"index":5,
"name": "source",
"type": "string"
,
"index":6,
"name": "update_date",
"type": "string"
,
"index":7,
"name": "update_time",
"type": "long"
,
"index":8,
"name": "wx_id",
"type": "string"
,
"index":9,
"name": "wx_name",
"type": "string"
]

,
"writer":
"name": "mysqlwriter",
"parameter":
"column": [
"id",
"crawler_time",
"file_url",
"flag",
"logo_url",
"source",
"update_date",
"update_time",
"wx_id",
"wx_name"
],
"connection": [

"jdbcUrl": "jdbc:mysql://*.*.*.*:3306/weixin?characterEncoding=utf8",
"table": ["fileids_wxpy"]

],
"password": "123456",
"username": "root"

,
"transformer": [

"name": "dx_filter",
"parameter":

"columnIndex":1,
"paras":["<","1560493441"]


]
]

运行


# python 环境为2.7
python datax.py mongodb2mysql_inc.json

运行结果


2019-06-14 15:22:58.886 [job-0] INFO  JobContainer - PerfTrace not enable!
2019-06-14 15:22:58.886 [job-0] INFO StandAloneJobContainerCommunicator - Total 53 records, 18669 bytes | Speed 93B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Transfermor Success 51848 records | Transformer Error 0 records | Transformer Filter 51795 records | Transformer usedTime 0.000s | Percentage 100.00%
2019-06-14 15:22:58.887 [job-0] INFO JobContainer -
任务启动时刻 : 2019-06-14 15:19:37
任务结束时刻 : 2019-06-14 15:22:58
任务总计耗时 : 201s
任务平均流量 : 93B/s
记录写入速度 : 0rec/s
读出记录总数 : 53
读写失败总数 : 0

2019-06-14 15:22:58.887 [job-0] INFO JobContainer -
Transformer成功记录总数 : 51848
Transformer失败记录总数 : 0
Transformer过滤记录总数 : 51795

扩展: 定时同步实现

  • mysql_max_timestamp2csv.json



"job":
"content": [

"reader":
"name": "mysqlreader",
"parameter":
"connection": [

"jdbcUrl": ["jdbc:mysql://*.*.*.*:x:3306/weixin?characterEncoding=utf8"],
"querySql": [
"SELECT max(crawler_time) FROM fileids_wxpy"
]

],
"password": "123456",
"username": "root"

,
"writer":
"name": "txtfilewriter",
"parameter":
"fileName": "mysql_max_timestamp_result",
"fileFormat": "csv",
"path": "/root/datax/bin",
"writeMode": "truncate"



],
"setting":
"speed":
"channel": 2



  • datax.py mongodb2mysql_inc.json(这里与上面的同名文件只有过滤条件时间戳不同,此处固定为"timestamp",方便shell脚本替换更新)



"job":
"setting":
"speed":
"channel": 4

,
"content": [
"reader":
"name": "mongodbreader",
"parameter":
"address": ["*.*.*.*:27017"],
"userName": "DataXTest",
"userPassword": "123456",
"dbName": "weixin",
"collectionName": "fileids_wxpy",
"column": [
"index":0,
"name": "_id",
"type": "string"
,
"index":1,
"name": "crawler_time",
"type": "string"
,
"index":2,
"name": "file_url",
"type": "string"
,
"index":3,
"name": "flag",
"type": "string"
,
"index":4,
"name": "logo_url",
"type": "string"
,
"index":5,
"name": "source",
"type": "string"
,
"index":6,
"name": "update_date",
"type": "string"
,
"index":7,
"name": "update_time",
"type": "long"
,
"index":8,
"name": "wx_id",
"type": "string"
,
"index":9,
"name": "wx_name",
"type": "string"
]

,
"writer":
"name": "mysqlwriter",
"parameter":
"column": [
"id",
"crawler_time",
"file_url",
"flag",
"logo_url",
"source",
"update_date",
"update_time",
"wx_id",
"wx_name"
],
"connection": [

"jdbcUrl": "jdbc:mysql://*.*.*.*:3306/weixin?characterEncoding=utf8",
"table": ["fileids_wxpy"]

],
"password": "123456",
"username": "root"

,
"transformer": [

"name": "dx_filter",
"parameter":

"columnIndex":1,
"paras":["<","timestamp"]


]
]

  • cron_datax_mongodb2mysql.sh


python2 /root/datax/bin/datax.py /root/datax/bin/mysql_max_timestamp2csv.json
# $?是shell变量,表示"最后一次执行命令"的退出状态.0为成功,非0为失败, -ne 为不等于
if [ $? -ne 0 ]; then
echo "minute_data_sync.sh error, can not get max_time from target db!"
exit 1
fi
# 找到 DataX 写入的文本文件,并将内容读取到一个变量中
RESULT_FILE=`ls /root/datax/bin/mysql_max_timestamp_result_*`
MAX_TIME=`cat $RESULT_FILE`
echo "$RESULT_FILE $MAX_TIME"
# 如果最大时间戳不为 null 的话, 修改全部同步的配置,进行增量更新;
if [ "$MAX_TIME" != "null" ]; then
# 设置增量更新过滤条件
WHERE="$MAX_TIME"
# 将timestamp字符串替换为上次同步的最大时间戳
sed "s/timestamp/$WHERE/g" mongodb2mysql_inc.json > mongodb2mysql_inc_tmp.json
#echo "增量更新"
python2 /root/datax/bin/datax.py /root/datax/bin/mongodb2mysql_inc_tmp.json
# 删除临时文件
rm ./mongodb2mysql_inc_tmp.json
else
# echo "全部更新"
python2 /root/datax/bin/datax.py /root/datax/bin/mongodb2mysql.json
fi
  • 通过linux 自带的crontab进行定时管理


30 22 * * * cd /root/datax/bin && sh cron_datax_mongodb2mysql.sh >>/root/datax/bin/cron_datax_mongodb2mysql.log
  • 定时运行日志

DataX

vim /root/datax/bin/cron_datax_mongodb2mysql.log
······
2019-06-14 17:14:36.178 [job-0] INFO JobContainer - PerfTrace not enable!
2019-06-14 17:14:36.178 [job-0] INFO StandAloneJobContainerCommunicator - Total 65 records, 22919 bytes | Speed 114B/s, 0 records/s | Error 1 records, 350 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 171.039s | Transfermor Success 52013 records | Transformer Error 0 records | Transformer Filter 51948 records | Transformer usedTime 0.000s | Percentage 100.00%
2019-06-14 17:14:36.179 [job-0] INFO JobContainer -
任务启动时刻 : 2019-06-14 17:11:13
任务结束时刻 : 2019-06-14 17:14:36
任务总计耗时 : 202s
任务平均流量 : 114B/s
记录写入速度 : 0rec/s
读出记录总数 : 65
读写失败总数 : 1

2019-06-14 17:14:36.179 [job-0] INFO JobContainer -
Transformer成功记录总数 : 52013
Transformer失败记录总数 : 0
Transformer过滤记录总数 : 51948

总结

  • 优点: 就不说了,太多了
  • 缺点:缺乏对增量更新的内置支持,但因为DataX的灵活架构,可以通过shell脚本等方式方便实现增量同步

对于DataX中支持querySql语法的源数据库推荐参考下文​​使用 DataX 增量同步数据​​,从数据源头过滤数据,可以很好的提高同步效率

以上是关于DataX 增量同步数据(shell)的主要内容,如果未能解决你的问题,请参考以下文章

DolphinScheduler 调度 DataX 实现 MySQL To MySQL 增量数据同步实战

大数据常用同步工具

DATAX如何增量同步数据

datax传递多个参数到json

使用datax-web 按批次增量同步数据并生成对账数据

使用 DataX 实现数据同步(高效的数据同步工具)