DataX入门
Posted 派大星`
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了DataX入门相关的知识,希望对你有一定的参考价值。
目录
9.3.1. 在Hive中建表的时候指定表( NULL DEFINED AS '' )
1. DataX介绍
DataX 是阿里巴巴开源的一个异构数据源离线同步工具,致力于实现包括关系型数据库(mysql、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能
源码地址: 点击进入
组件地址: 点击下载
2. DataX支持的常用数据源类型
类型 | 数据源 |
关系型数据库 | MySql |
Oracle | |
SQLServer | |
PostgreSQL | |
NoSql数据存储 | HBase 0.94 / 1.1 |
Phoenix 4.x / 5.x | |
MongoDB | |
Hive | |
无结构化数据存储 | TxtFile |
FTP | |
HDFS | |
ElasticSearch 支持读不支持写 |
3. 设计理念
为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。
对于使用者,只需要学习DataX的数据源配置方式就可以将数据源里面的数据进行传输
4. DataX框架设计
DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中
4.1. Reader
数据采集模块,负责采集数据源的数据,将数据发送给Framework
4.2. Writer
数据写入模块,负责不断向Framework取数据, 并将数据写入到目的端
4.3. Framework
用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,控流,并发,数据转换等核心技术问题
5. DataX的运行流程
DataX采集全量数据的原理是通过sql查询的方式获取数据,查询语句会被切割,例如 通过时间查询,按照时间维度将数据切分成多个Task, DataX在传输数据的时候将启动TaskGroup,每个TaskGroup负责一定的并发度运行其所得的Task,单个TaskGroup的并发的固定为5,总TaskGroup数量与配置的总并发度有关: TaskGroup数量 = 总并发度 / 5
6. DataX与Sqoop对比
功能 | DataX | Sqoop |
运行模式 | 单进程多线程 | MR |
分布式 | 不支持,可以通过调度系统规避 | 支持 |
控流 | 有 | 需要定制开发 |
统计信息 | 已有一些统计,上报需要定制 | 没有, 分布式数据数据不方便 |
数据校验 | 在core部分与校验功能 | 没有, 分布式收集数据不方便 |
监控 | 需要定制 | 需要定制 |
7. 部署
下载DataX安装包到服务器解压并测试运行
python <datax_home>/bin/datax.py <datax_home>/job/job.json
8. 配置详解
可以使用如下命名查看DataX配置文件模板
python bin/datax.py -r mysqlreader -w hdfswriter
配置文件模板如下,json最外层是一个job,job包含setting和content两部分,其中setting用于对整个job进行配置,content用户配置数据源和目的地
Reader和Writer的具体参数可参考官方文档,点击查看
9. 案例 同步MySql到HDFS
9.1. 整体结构
"job":
"content": [
"reader": ,
"writer":
],
"setting":
"speed":
"channel": 1
9.2. mySqlReader
9.2.1. 使用tableMode
"name": "mysqlreader", //Reader名称 , 固定写法
"parameter":
"username": "root", //数据库用户密码
"password": "123456",
"connection": [
"jdbcUrl": ["jdbc:mysql://node1:3306/gmall"],//数据库jdbc url
"table": ["tb"] //数据库jdbc url
],
"column": ["id", "name", "age"], //同步的字段 ["*"]表示所有字段
"where": "id>=0", //while过滤条件
"splitPk": "" //分片字段,如果没有这个字段,或者值为空,则只有一个Task
9.2.2. 使用QuerySQLMode
"name": "mysqlreader", //Reader名称 , 固定写法
"parameter":
"username": "root", //数据库用户密码
"password": "123456",
"connection": [
"jdbcUrl": ["jdbc:mysql://node1:3306/gmall"],//数据库jdbc url
"table": ["select * from tb"] //数据库jdbc url
]
9.3. HDFSWriter
"name": "hdfswriter", //Writer名称 , 固定写法
"parameter":
"column": [ //列信息,包括列名和类型 类型为Hive表字段类型,目前不支持decimal,binary,arrays,maps,struicts
"name": "id",
"type": "bigint"
,
"name": "name",
"type": "string"
,
"name": "age",
"type": "bigint"
],
"defaultFS": "hdfs://node2:8020", //HDFS文件系统namenode节点地址,不支持传HA的集群名
"path": "/mydatax", //HDFS文件系统目标路径
"fileName": "tb", //HDFS文件名前缀
"fileType": "text", //HDFS文件类型
"compress": "gzip", //HDFS压缩类型 text文件支持压缩gzip bzip2; orc文件支持压缩NONE SNAPPY
"fieldDelimiter": "\\t", //HDFS的分隔符
"writeMode": "append" //数据写入的模式 append 追加 ; nonConflict: 若写入目录有同名(前缀相同文件),报错
注意:
HFDS Writer并未提供nullFormat参数:也就是用户并不能自定义null值写到HFDS文件中的存储格式。默认情况下,HFDS Writer会将null值存储为空字符串(''),而Hive默认的null值存储格式为\\N。所以后期将DataX同步的文件导入Hive表就会出现问题。
解决方法有两种,任意一种都可以
9.3.1. 在Hive中建表的时候指定表( NULL DEFINED AS '' )
9.3.2. 修改源码 点击参考
9.4. setting
"speed": //传输速度配置
"channel": 1 //并发数
,
"errorLimit": // 容错比例配置
"record": 1, //错误条数上限,超出则任务失败
"percentage": 0.02 //错误比例上限,超出则任务失败
9.5. 完整配置示例
"job":
"content": [
"reader":
"name": "mysqlreader",
"parameter":
"username": "root",
"password": "123456",
"connection": [
"jdbcUrl": ["jdbc:mysql://node1:3306/yangxp"],
"table": ["tb"]
],
"column": ["id", "name", "age"],
"where": "id>=0",
"splitPk": ""
,
"writer":
"name": "hdfswriter",
"parameter":
"column": [
"name": "id",
"type": "bigint"
,
"name": "name",
"type": "string"
,
"name": "age",
"type": "bigint"
],
"defaultFS": "hdfs://node2:8020",
"path": "/mydatax",
"fileName": "tb",
"fileType": "text",
"compress": "gzip",
"fieldDelimiter": "\\t",
"writeMode": "append"
],
"setting":
"speed":
"channel": 1
,
"errorLimit":
"record": 1,
"percentage": 0.02
9.6. 模拟传输
9.6.1. 启动hadoop和Hive创建Hive表
DROP TABLE IF EXISTS tb;
CREATE EXTERNAL TABLE tb
(
`id` STRING COMMENT '编号',
`name` STRING COMMENT '姓名',
`age` STRING COMMENT '年龄'
) COMMENT '年龄表'
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t'
NULL DEFINED AS ''
LOCATION '/mydatax/';
9.6.2. 启动传输任务
将配置内容写入mysql_to_hive.json配置文件到目录<datax_home>/job/下并启动任务
python <datax_home>/bin/datax.py <datax_home>/job/mysql_to_hive.json
9.6.3. 查看数据
数据被压缩后上传到Hive,直接在看会乱码
可以借助hive客户端查看表数据,或者使用hdfs的zcat查看
hdfs dfs -cat /mydatax/tb__af7eac41_a69e_4976_bb68_a98cd5d3a689.gz|zcat
9.7. DataX传参
通常情况下,离线数据同步任务需要每日定时重复执行,故HDFS上的目标路径通常会包含一层日期,以对每日同步的数据加以区分,也就是说每日同步数据的目标路径不是固定不变的,因此DataX配置文件中HDFS Writer的path参数的值应该是动态的。为实现这一效果,就需要使用DataX传参的功能。
DataX传参的用法如下,在JSON配置文件中使用$param引用参数,在提交任务时使用-p"-Dparam=value"传入参数值,具体示例如下。
"name": "hdfswriter",
"parameter":
...
"path": "/mydatax/$dt",
...
python <datax_home>/bin/datax.py -p"-Ddt=2023-03-04" <datax_home>/job/mysql_to_hdfs.json
10. DataX参数优化
10.1. 速度控制
DataX3.0提供了包括通道(并发)、记录流、字节流三种流控模式,可以随意控制你的作业速度,让你的作业在数据库可以承受的范围内达到最佳的同步速度。
参数 | 说明 |
job.setting.speed.channel | 并发数 |
job.setting.speed.record | 总record限速 |
job.setting.speed.byte | 总byte限速 |
core.transport.channel.speed.record | 单个channel的record限速,默认为10000(10000条/s) |
core.transport.channel.speed.byte | 单个channel的byte限速,默认值1024*1024(1M/s) |
注意事项:
- 1.若配置了总record限速,则必须配置单个channel的record限速
- 2.若配置了总byte限速,则必须配置单个channe的byte限速
- 3.若配置了总record限速和总byte限速,channel并发数参数就会失效。因为配置了总record限速和总byte限速之后,实际channel并发数是通过计算得到的:
计算公式为:
min(总byte限速/单个channel的byte限速,总record限速/单个channel的record限速)
配置示例:
"core":
"transport":
"channel":
"speed":
"byte": 1048576 //单个channel byte限速1M/s
,
"job":
"setting":
"speed":
"byte" : 5242880 //总byte限速5M/s
,
...
10.2. 内存调整
当提升DataX Job内Channel并发数时,内存的占用会显著增加,因为DataX作为数据交换通道,在内存中会缓存较多的数据。例如Channel中会有一个Buffer,作为临时的数据交换的缓冲区,而在部分Reader和Writer的中,也会存在一些Buffer,为了防止OOM等错误,需调大JVM的堆内存。
建议将内存设置为4G或者8G,这个也可以根据实际情况来调整。
调整JVM xms xmx参数的两种方式:一种是直接更改datax.py脚本;另一种是在启动的时候,加上对应的参数,如下:
python <datax_home>/bin/datax.py --jvm="-Xms8G -Xmx8G" <datax_home>/job/mysql_to_hdfs..json
以上是关于DataX入门的主要内容,如果未能解决你的问题,请参考以下文章