数据同步工具DataX和DataWeb知识手册,DataX优化
Posted 爱是与世界平行
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了数据同步工具DataX和DataWeb知识手册,DataX优化相关的知识,希望对你有一定的参考价值。
一、概述
DataX 是阿里云 DataWorks数据集成 的开源版本,在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。DataX 实现了包括 mysql、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS 等各种异构数据源之间高效的数据同步功能。
Gitee:github.com/alibaba/Dat…
GitHub地址:github.com/alibaba/Dat…
DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-FvHA10yb-1668579645212)(https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/b79655df94f24f6cac341f34431f3753~tplv-k3u1fbpfcp-zoom-in-crop-mark:4536:0:0:0.image)]
- 为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。
- DataX在阿里巴巴集团内被广泛使用,承担了所有大数据的离线同步业务,并已持续稳定运行了6年之久。目前每天完成同步8w多道作业,每日传输数据量超过300TB。
二、DataX3.0框架设计
DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1H7atcOJ-1668579645214)(https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/f1ca3340b8704e3eb8adfdfc7c036e02~tplv-k3u1fbpfcp-zoom-in-crop-mark:4536:0:0:0.image)]
- Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
- Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
- Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。
三、DataX3.0架构
DataX 3.0 开源版本支持单机多线程模式完成同步作业运行,本小节按一个DataX作业生命周期的时序图,从整体架构设计非常简要说明DataX各个模块相互关系。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-RMqQCzvL-1668579645214)(https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/f90f02c880c942f187a5aa47390b1f0b~tplv-k3u1fbpfcp-zoom-in-crop-mark:4536:0:0:0.image)]
3.1 核心模块介绍
- DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。
- DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。
- 切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。
- 每一个Task都由TaskGroup负责启动,Task启动后,会固定启动
Reader—>Channel—>Writer
的线程来完成任务同步工作。 - DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0
3.2 DataX调度流程
举例来说,用户提交了一个DataX作业,并且配置了20个并发,目的是将一个100张分表的mysql数据同步到odps(Open Data Processing Service:开发数据处理服务)
里面。 DataX的调度决策思路是:
- DataXJob根据分库分表切分成了100个Task。
- 根据20个并发,DataX计算共需要分配4个TaskGroup。
- 4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责以5个并发共计运行25个Task。
四、环境部署
4.1 下载
$ mkdir -p /opt/datax ; cd /opt/datax
$ wget http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
4.2 设置环境变量
$ cd /opt/datax
$ vi /etc/profile
export DATAX_HOME=/opt/datax
export PATH=$DATAX_HOME/bin:$PATH
$ source /etc/profile
当自测DataX运行出错时,删除一些临时文件。
rm -fr /opt/datax/plugin/*/._*
五、DataX 实战示例
DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入。DataX目前支持数据如下图,详情请查看GitHub官方文档:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-gje4yRjk-1668579645215)(https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/6751d48bb90e425d9570bb4492f116e6~tplv-k3u1fbpfcp-zoom-in-crop-mark:4536:0:0:0.image)]
4.1 MYSQL to HDFS
4.1.1 准备好库表数据
$ mysql -uroot -p
密码:123456
creta database datax;
CREATE TABLE IF NOT EXISTS `datax`.`person` (
`id` int(10) NOT NULL AUTO_INCREMENT COMMENT 'ID',
`name` VARCHAR(32) COMMENT '用户名',
`age` int(10) COMMENT '年龄',
PRIMARY KEY (`id`)
)ENGINE=INNODB DEFAULT CHARSET=utf8;
insert into person(name,age) values ('person001',18) ,('person002',19),('person003',20),('person004',21),('person005',22);
select * from datax.person;
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-VILRqfea-1668579645216)(https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/96942d5c22924a37bac6593e6d28b088~tplv-k3u1fbpfcp-zoom-in-crop-mark:4536:0:0:0.image)]
4.1.2 配置json文件
$ cd $DATAX_HOME
$ mkdir test
$ cat > ./test/mysql2hdfs <<EOF
"job":
"setting":
"speed":
"channel":1
,
"content": [
"reader":
"name": "mysqlreader",
"parameter":
"username": "root",
"password": "123456",
"connection": [
"querySql": [
"select * from datax.person;"
],
"jdbcUrl": [
"jdbc:mysql://hadoop-node1:3306/datax?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true"
]
]
,
"writer":
"name": "streamwriter",
"parameter":
"defaultFS": "hdfs://hadoop-node1:8082",
"fileType": "text",
"path": "/tmp/datax/",
"fileName": "person",
"column": [
"name": "id",
"type": "INT"
,
"name": "name",
"type": "STRING"
,
"name": "age",
"type": "INT"
],
"writeMode": "append",
"fieldDelimiter": ","
]
EOF
$ hadoop fs -mkdir /tmp/datax/
4.1.3 执行
$ cd $DATAX_HOME
$ python2 bin/datax.py test/mysql2hdfs
【温馨提示】如果mysql连接不上,请更换对应版本的mysql驱动,
$DATA_HOME/plugin/reader/mysqlreader/libs/mysql-connector-java-*
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-eR8mrAME-1668579645217)(https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/3907fb8a13554536838a12d58316093e~tplv-k3u1fbpfcp-zoom-in-crop-mark:4536:0:0:0.image)]
4.1.4 验证
打开HDFS web检查
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-HIJ8u04S-1668579645218)(https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/57d0d3a697164b6d934a923382181799~tplv-k3u1fbpfcp-zoom-in-crop-mark:4536:0:0:0.image)]
4.2 MYSQL to Hive
4.2.1 准备好hive库表数据
$ beeline -u jdbc:hive2://hadoop-node1:11000 -n root
-- 创建库
CREATE DATABASE datax
-- 创建表时指定库,指定分隔符
CREATE TABLE IF NOT EXISTS datax.hive_person (
id INT COMMENT 'ID',
name STRING COMMENT '名字',
age INT COMMENT '年龄'
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\\n';
4.2.2 配置json文件
【温馨提示】其实这里也是推送数据HDFS文件,只不过时推送到表目录下。只需要将上面的json配置改一行就行了。完整配置如下:
"job":
"setting":
"speed":
"channel":1
,
"content": [
"reader":
"name": "mysqlreader",
"parameter":
"username": "root",
"password": "123456",
"connection": [
"querySql": [
"select * from datax.person;"
],
"jdbcUrl": [
"jdbc:mysql://hadoop-node1:3306/datax?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true"
]
]
,
"writer":
"name": "hdfswriter",
"parameter":
"defaultFS": "hdfs://hadoop-node1:8082",
"fileType": "text",
"path": "/user/hive/warehouse/datax.db/hive_person",
"fileName": "person",
"column": [
"name": "id",
"type": "INT"
,
"name": "name",
"type": "STRING"
,
"name": "age",
"type": "INT"
],
"writeMode": "append",
"fieldDelimiter": ","
]
JSON模板2:
"job":
"setting":
"speed":
"channel": 3
,
"errorLimit":
"record": 0,
"percentage": 0.02
,
"content": [
"reader":
"name": "mysqlreader",
"parameter":
"username": "用户名",
"password": "密码",
"column": [
"deptno",
"dname",
"loc"
],
"connection": [
"table": [
"dept"
],
"jdbcUrl": [
"jdbc:mysql://IP:3306/test"
]
]
,
"writer":
"name": "hdfswriter",
"parameter":
"defaultFS": "hdfs://hdfs-ha",
"hadoopConfig":
"dfs.nameservices": "hdfs-ha",
"dfs.ha.namenodes.hdfs-ha": "nn1,nn2",
"dfs.namenode.rpc-address.hdfs-ha.nn1": "node01:8020",
"dfs.namenode.rpc-address.hdfs-ha.nn2": "node02:8020",
"dfs.client.failover.proxy.provider.hdfs-ha": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
,
"fileType": "text",
"path": "/user/hive/warehouse/ods.db/datax_dept",
"fileName": "202104",
"column": [
"name": "deptno",
"type": "int"
,
"name": "dname",
"type": "varchar"
,
"name": "loc",
"type": "varchar"
],
"writeMode": "append",
"fieldDelimiter": "\\t"
]
4.2.3 执行
python datax.py mysql2hive.json
登录hive客户端查看hive表数据
$ beeline -u jdbc:hive2://hadoop-node1:11000 -n root
$ select * from datax.hive_person;
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Nl0Eyczw-1668579645218)(https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/794f994c7a634acfa7d126572ff3a929~tplv-k3u1fbpfcp-zoom-in-crop-mark:4536:0:0:0.image)]
4.3 HDFS to MYSQL
4.3.1 准备好HDFS文件数据
$ cd $DATAX_HOME
$ cat >./test/person2.txt<<EOF
1,p1,21
2,p2,22
3,p3,30
4,p4,35
5,p5,31
6,p6,33
EOF
# 将文件推送到HDFS上
$ hadoop fs -put ./test/person2.txt /tmp/datax/
4.3.2 准备好MySQL表
CREATE TABLE IF NOT EXISTS `datax`.`person2` (
`id` int(10) NOT NULL AUTO_INCREMENT COMMENT 'ID',
`name` VARCHAR(32) COMMENT '用户名',
`age` int(10) COMMENT '年龄',
PRIMARY KEY (`id`)
)ENGINE=INNODB DEFAULT CHARSET=utf8;
4.3.3 配置json文件
$ cat >./test/hdfs2mysql.json<<EOF
"job":
"setting":
"speed":
"channel":1
,
"content": [
"reader":
"name": "hdfsreader",
"parameter":
"path": "/tmp/datax/person2.txt",
"defaultFS": "hdfs://hadoop-node1:8082",
"fileType": "text",
"column": [
"index": 0,
"type": "long"
,
"index": 1,
"type": "string"
,
"index": 2,
"type": "long"
],
"encoding": "UTF-8",
"fieldDelimiter": ","
,
"writer":
"name": "mysqlwriter",
"parameter":
"writeMode": "insert",
"username": "root",
"password": "123456",
"column": [
"id",
"name",
"age"
],
"preSql": [
"delete from person2"
],
"connection": [
"jdbcUrl": "jdbc:mysql://hadoop-node1:3306/datax?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true",
"table": [
"person2"
]
]
]
EOF
4.3.4 执行
$ python2 ./bin/datax.py ./test/hdfs2mysql.json
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YehNLa5G-1668579645219)(https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/4b2162ddee894d00ba15fe157e85ffec~tplv-k3u1fbpfcp-zoom-in-crop-mark:4536:0:0:0.image)]
4.3.5 验证
登录mysql查看
$ mysql -uroot -p
密码:123456
select * from datax.person2;
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-d5EkJHdn-1668579645220)(https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/a0e79ce7584e4bd88e065520c98a0396~tplv-k3u1fbpfcp-zoom-in-crop-mark:4536:0:0:0.image)]
4.4 DataX到HBase
六、DataX-WEB 安装部署
GitHub地址:github.com/WeiYe-Jing/…
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-qLjOIn8U-1668579645220)(https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/c296925c391e49a09085aaeb1c96e7ae~tplv-k3u1fbpfcp-zoom-in-crop-mark:4536:0:0:0.image)]
1)下载
下载地址:
pan.baidu.com/share/init?… 提取码:cpsk
2)解压
$ cd /opt/bigdata/hadoop/software
$ tar -xf datax-web-2.1.2.tar.gz -C /opt/bigdata/hadoop/server/
3)配置环境变量
$ cd /opt/bigdata/hadoop/server/datax-web-2.1.2
$ vi /etc/profile
export DATAXWEB_HOME=/opt/bigdata/hadoop/server/datax-web-2.1.2
export PATH=$DATAXWEB_HOME/bin:$PATH
$ source /etc/profile
4)创建dataxweb数据库
$ mysql -uroot -p -hhadoop-node1
密码:123456
create database dataxweb;
5)执行一键安装脚本
$ cd $DATAXWEB_HOME
$ ./bin/install.sh
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-3lCXNLb9-1668579645221)(https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/6cba6923fa00414aac2cd93976a444da~tplv-k3u1fbpfcp-zoom-in-crop-mark:4536:0:0:0.image)]
6)修改配置
1、修改datax-admin配置
$ cd $DATAXWEB_HOME
# 修改数据库配置,如果上面配置了,就可以跳过
$ vi ./modules/datax-admin/conf/bootstrap.properties
# 配置环境变量
$ vi ./modules/datax-admin/bin/env.properties
# web端口
SERVER_PORT=18088
# 创建 mybatis-plus打印sql日志默认目录,默认路径:$ $DATAXWEB_HOME/modules/datax-admin/data/applogs/admin,要修改就这个配置文件:$DATAXWEB_HOME/modules/datax-admin/conf/application.yml
$ mkdir -p $DATAXWEB_HOME/modules/datax-admin/data/applogs/admin
2、修改datax-executor配置
$ cd $DATAXWEB_HOME
# 修改数据库配置,如果上面配置了,就可以跳过
$ vi ./modules/datax-executor/conf/bootstrap.properties
# 配置环境变量
$ vi ./modules/datax-executor/bin/env.properties
# 主要修改配置如下:
## PYTHON脚本执行位置
PYTHON_PATH=/opt/bigdata/hadoop/server/datax/bin/datax.py
## 保持和datax-admin端口一致,更datax-admin的SERVER_PORT对应
DATAX_ADMIN_PORT=18088
# 创建 日志默认目录,默认路径:$DATAXWEB_HOME/modules/datax-executor/data/applogs/executor/jobhandler,要修改就这个配置文件:$DATAXWEB_HOME/modules/datax-executor/conf/application.yml
$ mkdir -p $DATAXWEB_HOME/modules/datax-executor/data/applogs/executor/jobhandler
7)启动服务
$ cd $DATAXWEB_HOME
$ ./bin/start-all.sh
# 或者分模块启动
$ ./bin/start.sh -m datax-admin
$ ./bin/start.sh -m datax-executor
# 查看datax-admin启动日志
$DATAXWEB_HOME/modules/datax-admin/bin/console.out
# 查看datax-executor启动日志
$DATAXWEB_HOME/modules/datax-executor/bin/console.out
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-70K3y6r6-1668579645222)(https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/aa3833d032544a1594b11381b94fddb6~tplv-k3u1fbpfcp-zoom-in-crop-mark:4536:0:0:0.image)]
web访问:http://hadoop-node1:18088/index.html 默认账号/密码:admin/123456
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-nZHdeD1s-1668579645223)(https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/19d373eb574942ebbd2c026b9607e93f~tplv-k3u1fbpfcp-zoom-in-crop-mark:4536:0:0:0.image)]
8)简单使用
前期准备
1、新建项目
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-tf2zNdVO-1668579645223)(https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/423b4703d3a047ab82dcd4515c0f532c~tplv-k3u1fbpfcp-zoom-in-crop-mark:4536:0:0:0.image)]
2、创建hive库和表
$ beeline
create database dataxweb;
CREATE TABLE IF NOT EXISTS dataxweb.hive_person(
id INT COMMENT 'ID',
name STRING COMMENT '名字',
age INT COMMENT '年龄'
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\\n';
3、创建dataxweb person表
CREATE TABLE `dataxweb`.`person` (
`id` int NOT NULL AUTO_INCREMENT COMMENT 'ID',
`name` varchar(32) DEFAULT NULL COMMENT '用户名',
`age` int DEFAULT NULL COMMENT '年龄',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8mb3;
1、MYSQL to Hive
创建任务
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-aI32ymGs-1668579645224)(https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/0c4f92466832454394d9180a7bdbc3d2~tplv-k3u1fbpfcp-zoom-in-crop-mark:4536:0:0:0.image)]
json配置如下:
"job":
"setting":
"speed":
"channel":1
,
"content": [
"reader":
"name": "mysqlreader",
"parameter":
"username": "root",
"password": "123456",
"connection": [
"querySql": [
"select * from datax.person;"
],
"jdbcUrl": [
"jdbc:mysql://hadoop-node1:3306/dataxweb?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true"
]
]
,
"writer":
"name": "hdfswriter",
"parameter":
"defaultFS": "hdfs://hadoop-node1:8082",
"fileType": "text",
"path": "/user/hive/warehouse/dataxweb.db/hive_person",
"fileName": "person",
"column": [
"name": "id",
"type": "INT"
,
"name": "name",
"type": "STRING"
,
"name": "age",
"type": "INT"
],
"writeMode": "append",
"fieldDelimiter": ","
]
执行,也可以定时执行
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-CcEJevUu-1668579645225)(https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/95688dc4c087460681af4fc9df36c5e8~tplv-k3u1fbpfcp-zoom-in-crop-mark:4536:0:0:0.image)]
查看日志
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YEGMnRfz-1668579645229)(https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/1091f21613c547249ed8b120edd7814f~tplv-k3u1fbpfcp-zoom-in-crop-mark:4536:0:0:0.image)]
2、Hive to MYSQL
创建任务
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xC4881e1-1668579645230)(https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/9cc9a34e4c0a4771ab57d7c802933ec1~tplv-k3u1fbpfcp-zoom-in-crop-mark:4536:0:0:0.image)]
json配置如下:
"job":
"setting":
"speed":
"channel": 1
,
"content": [
"reader":
"name": "hdfsreader",
"parameter":
"path": "/user/hive/warehouse/dataxweb.db/hive_person/person__7c10087d_a834_4558_b830_26322bad724b",
"defaultFS": "hdfs://hadoop-node1:8082",
"fileType": "text",
"column": [
"index": 0,
"type": "long"
,
"index": 1,
"type": "string"
,
"index": 2,
"type"使用 DataX 实现数据同步(高效的数据同步工具)