开源数据同步工具DataX

Posted Carlton Xu

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了开源数据同步工具DataX相关的知识,希望对你有一定的参考价值。

1. DataX

1.1. 产品特性

DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 mysql、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步功能。

官方提供的datax框架图:

最终把不同数据源和目标源组成的网状结构,变成了星型结构:

1.2. 支持场景

DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入,目前支持数据如下图,详情请点击:DataX数据源参考指南

类型数据源Reader(读)Writer(写)文档
RDBMS 关系型数据库MySQL
Oracle
SQLServer
PostgreSQL
DRDS
通用RDBMS(支持所有关系型数据库)
阿里云数仓数据存储ODPS
ADS
OSS
OCS
NoSQL数据存储OTS
Hbase0.94
Hbase1.1
Phoenix4.x
Phoenix5.x
MongoDB
Hive
Cassandra
无结构化数据存储TxtFile
FTP
HDFS
Elasticsearch
时间序列数据库OpenTSDB
TSDB

1.3. 迁移场景解决方案

1.3.1. 迁移测试场景

当前测试均选用阿里云RDS Mysql5.6作为测试源端以及目标端资源,DataX数据源可支持范围很大,具体支持场景可以参考本文档3段落部分

1.3.1.1. 待迁移源端数据库

数据库类型数据源数据库版本资源大小部署方式
关系型数据库阿里云RDSMysql 5.62C-4G-50G单主库

1.3.1.2. 待同步目标端数据库

数据库类型数据源数据库版本资源大小部署方式
关系型数据库阿里云RDSMysql 5.62C-4G-50G单主库

1.3.1.3. 迁移程序

  • 环境要求:linux(windows也可以)、JDK1.8级以上、 python 2.x
  • 本次测试环境:CentOS7.5、JDK1.8、Python2.7.5

1.3.2. 安装部署

部署两种方式

  • 使用官方编译好的 工具 包(datax.tar.gz),解压即用
  • 下载Datax源码,使用Maven进行编译。编译时间会有点长。

本次测试使用官方编译好的工具包(datax.tar.gz)下载并解压后使用

1.3.2.1. 安装JDK环境

yum install -y java-1.8.0 # 使用默认的CentOS7.5 Yum源即可

1.3.2.2. 安装apache-maven环境

yum install -y maven # 使用默认的CentOS7.5 Yum源即可

1.3.2.3. 安装Python环境

CentOS7.5 操作系统默认自带python 2.7.5,无需进行安装,可以直接进行使用

1.3.2.4. 下载DataX安装包

DataX安装包链接:http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz

curl -O http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz

1.3.2.5. 解压缩datax程序包

cd /root/  # 登陆到datax.tar.gz的下载目录
tar -zxvf datax.tar.gz -C /root/

1.3.3. 迁移使用

1.3.3.1. 生成迁移数据源样例模版

按照迁移需求使用DataX指令生成迁移数据源配置样例模版,然后可以根据数据源样例模版进行修改保存使用
执行以下指令生成数据源json文件,此自动生成的配置样例只是模版,还需要将输出json文件内容按照提示保存后进行修改(其他异构数据源模版,按照需求自主生成)

python /root/datax/bin/datax.py -r mysqlreader -w mysqlwriter  

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.


Please refer to the mysqlreader document:
     https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md

Please refer to the mysqlwriter document:
     https://github.com/alibaba/DataX/blob/master/mysqlwriter/doc/mysqlwriter.md

Please save the following configuration as a json file and  use
     python DATAX_HOME/bin/datax.py JSON_FILE_NAME.json
to run the job.


# 复制以下json内容并保存成xxxx.json文件


    "job": 
        "content": [
            
                "reader": 
                    "name": "mysqlreader",
                    "parameter": 
                        "column": [],
                        "connection": [
                            
                                "jdbcUrl": [],
                                "table": []
                            
                        ],
                        "password": "",
                        "username": "",
                        "where": ""
                    
                ,
                "writer": 
                    "name": "mysqlwriter",
                    "parameter": 
                        "column": [],
                        "connection": [
                            
                                "jdbcUrl": "",
                                "table": []
                            
                        ],
                        "password": "",
                        "preSql": [],
                        "session": [],
                        "username": "",
                        "writeMode": ""
                    
                
            
        ],
        "setting": 
            "speed": 
                "channel": ""
            
        
    

1.3.3.2. 修改数据源样例模版

此处按照我们的测试样例,源为mysql,目标也为mysql,mysql2mysql.json文件样例内容如下:


    "job": 
        "content": [
            
                "reader": 
                    "name": "mysqlreader",
                    "parameter": 
                        "column": ["*"],
                        "connection": [
                            
                                "jdbcUrl": ["jdbc:mysql://39.103.21.61:3306/coredb"],
                                "table": ["seepcore_table"]
                            
                        ],
                        "password": "Abc999@1",
                        "username": "xuxingzhuang",
                    
                ,
                "writer": 
                    "name": "mysqlwriter",
                    "parameter": 
                        "column": ["*"],
                        "connection": [
                            
                                "jdbcUrl": "jdbc:mysql://106.15.31.131:3306/coredb",
                                "table": ["seepcore_table"]
                            
                        ],
                        "password": "Abc999@1",
                        "preSql": [],
                        "session": [],
                        "username": "xuxingzhuang",
                        "writeMode": "insert"
                    
                
            
        ],
        "setting": 
            "speed": 
                "channel": "2"
            
        
    

注意⚠️:以上内容,要确保启动datax实例可以有权限访问远程数据库,源和目标库相关配置信息已经创建完成,具体参数详细配置信息,参考以下内容

配置样例mysqlreader参数详细说明,查看链接:DataX MysqlReader
配置样例mysqlwriter参数详细说明,查看链接:DataX MysqlWriter

1.3.3.3. 启动DataX程序

python /root/datax/bin/datax.py /root/datax/job/mysql2mysql.json
  • 程序执行输出LOG

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.


2020-07-16 06:25:32.889 [main] INFO  VMInfo - VMInfo# operatingSystem class => sun.management.OperatingSystemImpl
2020-07-16 06:25:32.902 [main] INFO  Engine - the machine info  =>

	osInfo:	Oracle Corporation 1.8 25.252-b09
	jvmInfo:	Linux amd64 3.10.0-862.3.2.el7.x86_64
	cpu num:	4

	totalPhysicalMemory:	-0.00G
	freePhysicalMemory:	-0.00G
	maxFileDescriptorCount:	-1
	currentOpenFileDescriptorCount:	-1

	GC Names	[PS MarkSweep, PS Scavenge]

	MEMORY_NAME                    | allocation_size                | init_size
	PS Eden Space                  | 256.00MB                       | 256.00MB
	Code Cache                     | 240.00MB                       | 2.44MB
	Compressed Class Space         | 1,024.00MB                     | 0.00MB
	PS Survivor Space              | 42.50MB                        | 42.50MB
	PS Old Gen                     | 683.00MB                       | 683.00MB
	Metaspace                      | -0.00MB                        | 0.00MB


2020-07-16 06:25:32.927 [main] INFO  Engine -

	"content":[
		
			"reader":
				"name":"mysqlreader",
				"parameter":
					"column":[
						"*"
					],
					"connection":[
						
							"jdbcUrl":[
								"jdbc:mysql://39.103.21.61:3306/coredb"
							],
							"table":[
								"seepcore_table"
							]
						
					],
					"password":"********",
					"username":"xuxingzhuang"
				
			,
			"writer":
				"name":"mysqlwriter",
				"parameter":
					"column":[
						"*"
					],
					"connection":[
						
							"jdbcUrl":"jdbc:mysql://106.15.31.131:3306/coredb",
							"table":[
								"seepcore_table"
							]
						
					],
					"password":"********",
					"preSql":[],
					"session":[],
					"username":"xuxingzhuang",
					"writeMode":"insert"
				
			
		
	],
	"setting":
		"speed":
			"channel":"2"
		
	


2020-07-16 06:25:32.956 [main] WARN  Engine - prioriy set to 0, because NumberFormatException, the value is: null
2020-07-16 06:25:32.959 [main] INFO  PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=0
2020-07-16 06:25:32.959 [main] INFO  JobContainer - DataX jobContainer starts job.
2020-07-16 06:25:32.962 [main] INFO  JobContainer - Set jobId = 0
2020-07-16 06:25:33.419 [job-0] INFO  OriginalConfPretreatmentUtil - Available jdbcUrl:jdbc:mysql://39.103.21.61:3306/coredb?yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true.
2020-07-16 06:25:33.421 [job-0] WARN  OriginalConfPretreatmentUtil - 您的配置文件中的列配置存在一定的风险. 因为您未配置读取数据库表的列,当您的表字段个数、类型有变动时,可能影响任务正确性甚至会运行出错。请检查您的配置并作出修改.
2020-07-16 06:25:33.929 [job-0] INFO  OriginalConfPretreatmentUtil - table:[seepcore_table] all columns:[
id,date_time,line_3,line_4,line_10,line_11,line_12,line_13,line_14,line_15,line_16,line_17,line_18,line_19,line_20
].
2020-07-16 06:25:33.930 [job-0] WARN  OriginalConfPretreatmentUtil - 您的配置文件中的列配置信息存在风险. 因为您配置的写入数据库表的列为*,当您的表字段个数、类型有变动时,可能影响任务正确性甚至会运行出错。请检查您的配置并作出修改.
2020-07-16 06:25:33.933 [job-0] INFO  OriginalConfPretreatmentUtil - Write data [
insert INTO %s (id,date_time,line_3,line_4,line_10,line_11,line_12,line_13,line_14,line_15,line_16,line_17,line_18,line_19,line_20) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
], which jdbcUrl like:[jdbc:mysql://106.15.31.131:3306/coredb?yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true]
2020-07-16 06:25:33.935 [job-0] INFO  JobContainer - jobContainer starts to do prepare ...
2020-07-16 06:25:33.936 [job-0] INFO  JobContainer - DataX Reader.Job [mysqlreader] do prepare work .
2020-07-16 06:25:33.936 [job-0] INFO  JobContainer - DataX Writer.Job [mysqlwriter] do prepare work .
2020-07-16 06:25:33.938 [job-0] INFO  JobContainer - jobContainer starts to do split ...
2020-07-16 06:25:33.938 [job-0] INFO  JobContainer - Job set Channel-Number to 2 channels.
2020-07-16 06:25:33.946 [job-0] INFO  JobContainer - DataX Reader.Job [mysqlreader] splits to [1] tasks.
2020-07-16 06:25:33.947 [job-0] INFO  JobContainer - DataX Writer.Job [mysqlwriter] splits to [1] tasks.
2020-07-16 06:25:33.978 [job-0] INFO  JobContainer - jobContainer starts to do schedule ...
2020-07-16 06:25:33.983 [job-0] INFO  JobContainer - Scheduler starts [1] taskGroups.
2020-07-16 06:25:33.986 [job-0] INFO  JobContainer - Running by standalone Mode.
2020-07-16 06:25:34.002 [taskGroup-0] INFO  TaskGroupContainer - taskGroupId=[0] start [1] channels for [1] tasks.
2020-07-16 06:25:34.010 [taskGroup-0] INFO  Channel - Channel set byte_speed_limit to -1, No bps activated.
2020-07-16 06:25:34.011 [taskGroup-0] INFO  Channel - Channel set record_speed_limit to -1, No tps activated.
2020-07-16 06:25:34.023 [taskGroup-0] INFO  TaskGroupContainer - taskGroup[0] taskId[0] attemptCount[1] is started
2020-07-16 06:25:34.032 [0-0-0-reader] INFO  CommonRdbmsReader$Task - Begin to read record by Sql: [select * from seepcore_table
] jdbcUrl:[jdbc:mysql://39.103.21.61:3306/coredb?yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true].
2020-07-16 06:25:44.018 [job-0] INFO  StandAloneJobContainerCommunicator - Total 0 records, 0 bytes | Speed 0B/s, 0 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.000s | Percentage 0.00%
2020-07-16 06:25:54.023 [job-0] INFO  StandAloneJobContainerCommunicator - Total 109056 records, 23305935 bytes | Speed 2.22MB/s, 10905 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 8.666s |  All Task WaitReaderTime 0.801s | Percentage 0.00%
2020-07-16 06:26:04.026 [job-0] INFO  StandAloneJobContainerCommunicator - Total 231936 records, 49667141 bytes | Speed 2.51MB/s, 12288 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 17.829s |  All Task WaitReaderTime 1.541s | Percentage 0.00%
2020-07-16 06:26:14.029 [job-0] INFO  StandAloneJobContainerCommunicator - Total 354816 records, 75963461 bytes | Speed 2.51MB/s, 12288 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 26.983s |  All Task WaitReaderTime 2.221s | Percentage 0.00%
2020-07-16 06:26:24.032 [job-0] INFO  StandAloneJobContainerCommunicator - Total 467456 records, 100068421 bytes | Speed 2.30MB/s, 11264 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 36.103s |  All Task WaitReaderTime 2.919s | Percentage 0.00%
2020-07-16 06:26:34.034 [job-0] INFO  StandAloneJobContainerCommunicator - Total 584192 records, 125049925 bytes | Speed 2.38MB/s, 11673 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 45.179s |  All Task WaitReaderTime 3.551s | Percentage 0.00%
.....
2020-07-16 06:30:34.113 [job-0] INFO  VMInfo -
	 [delta cpu info] =>
		curDeltaCpu                    | averageCpu                     | maxDeltaCpu                    | minDeltaCpu
		-1.00%                         | -1.00%                         | -1.00%                         | -1.00%


	 [delta memory info] =>
		 NAME                           | used_size                      | used_percent                   | max_used_size                  | max_percent
		 PS Eden Space                  | 309.99MB                       | 95.09%                         | 309.99MB                       | 95.09%
		 Code Cache                     | 6.02MB                         | 79.63%                         | 6.02MB                         | 79.63%
		 Compressed Class Space         | 1.81MB                         | 90.66%                         | 1.81MB                         | 90.66%
		 PS Survivor Space              | 5.53MB                         | 73.75%                         | 5.53MB                         | 73.75%
		 PS Old Gen                     | 6.35MB                         | 0.93%                          | 6.35MB                         | 0.93%
		 Metaspace                      | 18.57MB                        | 97.72%                         | 18.57MB                        | 97.72%

	 [delta gc info] =>
		 NAME                 | curDeltaGCCount    | totalGCCount       | maxDeltaGCCount    | minDeltaGCCount    | curDeltaGCTime     | totalGCTime        | maxDeltaGCTime     | minDeltaGCTime
		 PS MarkSweep         | 0                  | 0                  | 0                  | 0                  | 0.000s             | 0.000s             | 0.000s             | 0.000s
		 PS Scavenge          | 100                | 100                | 100                | 100                | 0.854s             | 0.854s             | 0.854s             | 0.854s

2020-07-16 06:30:44.115 [job-0] INFO  StandAloneJobContainerCommunicator - Total 3350592 records, 719410097 bytes | Speed 1.88MB/s, 9171 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 274.631s |  All Task WaitReaderTime 19.540s | Percentage 0.00%
.....

2020-07-16 06:34:21.281 [0-0-0-reader] INFO  CommonRdbmsReader$Task - Finished read record by Sql: [select * from seepcore_table
] jdbcUrl:[jdbc:mysql://39.103.21.61:3306/coredb?yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true].
2020-07-16 06:34:21.633 [taskGroup-0] INFO  TaskGroupContainer - taskGroup[0] taskId[0] is successed, used[527612]ms
2020-07-16 06:34:21.634 [taskGroup-0] INFO  TaskGroupContainer - taskGroup[0] completed it's tasks.
2020-07-16 06:34:24.169 [job-0] INFO  StandAloneJobContainerCommunicator - Total 5585206 records, 1197238078 bytes | Speed 3.49MB/s, 17183 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 484.287s |  All Task WaitReaderTime 32.298s | Percentage 100.00%
2020-07-16 06:34:24.169 [job-0] INFO  AbstractScheduler - Scheduler accomplished all tasks.
2020-07-16 06:34:24.170 [job-0] INFO  JobContainer - DataX Writer.Job [mysqlwriter] do post work.
2020-07-16 06:34:24.171 [job-0] INFO  JobContainer - DataX Reader.Job [mysqlreader] do post work.
2020-07-16 06:34:24.172 [job-0] INFO  JobContainer - DataX jobId [0] completed successfully.
2020-07-16 06:34:24.177 [job-0] INFO  HookInvoker - No hook invoked, because base dir not exists or is a file: /root/datax/hook
2020-07-16 06:34:24.178 [job-0] INFO  JobContainer -
	 [total cpu info] =>
		averageCpu                     | maxDeltaCpu                    | minDeltaCpu
		-1.00%                         | -1.00%                         | -1.00%


	 [total gc info] =>
		 NAME                 | totalGCCount       | maxDeltaGCCount    | minDeltaGCCount    | totalGCTime        | maxDeltaGCTime     | minDeltaGCTime
		 PS MarkSweep         | 0                  | 0                  | 0                  | 0.000s             | 0.000s             | 0.000s
		 PS Scavenge          | 166                | 100                | 66                 | 1.335s             | 0.854s             | 0.481s

2020-07-16 06:34:24.178 [job-0] INFO  JobContainer - PerfTrace not enable!
2020-07-16 06:34:24.178 [job-0] INFO  StandAloneJobContainerCommunicator - Total 5585206 records, 1197238078 bytes | Speed 2.15MB/s, 10538 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 484.287s |  All Task WaitReaderTime 32.298s | Percentage 100.00%
2020-07-16 06:34:24.181 [job-0] INFO  JobContainer -
任务启动时刻                    : 2020-07-16 06:25:32
任务结束时刻                    : 2020-07-16 06:34:24
任务总计耗时                    :                531s
任务平均流量                    :            2.15MB/s
记录写入速度                    :          10538rec/s
读出记录总数                    :             5585206
读写失败总数                    :                   0

以上内容程序执行输出可以看到迁移进度以及迁移资源花费,并且最后还会输出汇总信息,到此为止mysql迁移mysql的已经完成

注意⚠️:当前测试为一张表进行迁移,如果有多张表,请按照需求进行配置多个json文件进行拷贝,datax实例可以cpu可以配置高一些,将json文件的speed.channel调整大一些,并发效果会更好一些

1.4. 迁移测试总结

1.4.1. DataX优势

  • DataX较适合跨数据库表级的数据一次性迁移。
  • 可跨异构数据库,支持多数据源

1.4.2. DataX缺点

  • 无法做增量数据同步,每一次同步都需要清空目标端表格资料。
  • 无法支持实时同步。

以上是关于开源数据同步工具DataX的主要内容,如果未能解决你的问题,请参考以下文章

开源数据同步工具DataX

阿里云开源离线同步工具DataX3.0介绍

关于数据同步工具DataX部署

数据同步工具DataX和DataWeb知识手册,DataX优化

数据同步工具DataX和DataWeb知识手册,DataX优化

阿里又开源一款数据同步工具 DataX,稳定又高效,好用到爆!