DataX
Posted shengyang17
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了DataX相关的知识,希望对你有一定的参考价值。
1.什么是DataX
? DataX 是阿里巴巴开源的一个异构数据源离线同步工具,致力于实现包括关系型数据库(mysql、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。
2. DataX的设计
为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。
3. 框架设计
DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。
Reader:数据采集模块,负责采集数据源的数据,将数据发送给Framework。
Writer:数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
Framework:用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。
4. 运行原理
1) DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。
2) DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。
3) 切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。
4) 每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。
5) DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0
5.快速入门
前置要求
- Linux
- JDK(1.8以上,推荐1.8)
- Python(推荐Python2.6.X)
安装
1)将下载好的datax.tar.gz上传到hadoop101的/opt/software
[[email protected] software]$ ls
datax.tar.gz
2)解压datax.tar.gz到/opt/module
[[email protected] software]$ tar -zxvf datax.tar.gz -C /opt/module/
3)运行自检脚本
[[email protected] bin]$ cd /opt/module/datax/bin/
[[email protected] bin]$ python datax.py /opt/module/datax/job/job.json
[[email protected] bin]$ python datax.py /opt/module/datax/job/job.json DataX (DATAX-OPENSOURCE-3.0), From Alibaba ! Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved. 2019-07-14 07:51:25.661 [main] INFO VMInfo - VMInfo# operatingSystem class => sun.management.OperatingSystemImpl 2019-07-14 07:51:25.676 [main] INFO Engine - the machine info => osInfo: Oracle Corporation 1.8 25.144-b01 jvmInfo: Linux amd64 2.6.32-642.el6.x86_64 cpu num: 8 totalPhysicalMemory: -0.00G freePhysicalMemory: -0.00G maxFileDescriptorCount: -1 currentOpenFileDescriptorCount: -1 GC Names [PS MarkSweep, PS Scavenge] MEMORY_NAME | allocation_size | init_size PS Eden Space | 256.00MB | 256.00MB Code Cache | 240.00MB | 2.44MB Compressed Class Space | 1,024.00MB | 0.00MB PS Survivor Space | 42.50MB | 42.50MB PS Old Gen | 683.00MB | 683.00MB Metaspace | -0.00MB | 0.00MB 2019-07-14 07:51:25.708 [main] INFO Engine - "content":[ "reader": "name":"streamreader", "parameter": "column":[ "type":"string", "value":"DataX" , "type":"long", "value":19890604 , "type":"date", "value":"1989-06-04 00:00:00" , "type":"bool", "value":true , "type":"bytes", "value":"test" ], "sliceRecordCount":100000 , "writer": "name":"streamwriter", "parameter": "encoding":"UTF-8", "print":false ], "setting": "errorLimit": "percentage":0.02, "record":0 , "speed": "byte":10485760 2019-07-14 07:51:35.892 [job-0] INFO HookInvoker - No hook invoked, because base dir not exists or is a file: /opt/module/datax/hook 2019-07-14 07:51:35.896 [job-0] INFO JobContainer - [total cpu info] => averageCpu | maxDeltaCpu | minDeltaCpu -1.00% | -1.00% | -1.00% [total gc info] => NAME | totalGCCount | maxDeltaGCCount | minDeltaGCCount | totalGCTime | maxDeltaGCTime | minDeltaGCTime PS MarkSweep | 0 | 0 | 0 | 0.000s | 0.000s | 0.000s PS Scavenge | 0 | 0 | 0 | 0.000s | 0.000s | 0.000s 2019-07-14 07:51:35.897 [job-0] INFO JobContainer - PerfTrace not enable! 2019-07-14 07:51:35.898 [job-0] INFO StandAloneJobContainerCommunicator - Total 100000 records, 2600000 bytes | Speed 253.91KB/s, 10000 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.018s | All Task WaitReaderTime 0.047s | Percentage 100.00% 2019-07-14 07:51:35.899 [job-0] INFO JobContainer - 任务启动时刻 : 2019-07-14 07:51:25 任务结束时刻 : 2019-07-14 07:51:35 任务总计耗时 : 10s 任务平均流量 : 253.91KB/s 记录写入速度 : 10000rec/s 读出记录总数 : 100000 读写失败总数 : 0
6. 使用案例
①从stream流读取数据并打印到控制台
1)查看配置模板
[[email protected] bin]$ python datax.py -r streamreader -w streamwriter
DataX (DATAX-OPENSOURCE-3.0), From Alibaba ! Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved. Please refer to the streamreader document: https://github.com/alibaba/DataX/blob/master/streamreader/doc/streamreader.md Please refer to the streamwriter document: https://github.com/alibaba/DataX/blob/master/streamwriter/doc/streamwriter.md Please save the following configuration as a json file and use python DATAX_HOME/bin/datax.py JSON_FILE_NAME.json to run the job. "job": "content": [ "reader": "name": "streamreader", "parameter": "column": [], "sliceRecordCount": "" , "writer": "name": "streamwriter", "parameter": "encoding": "", "print": true ], "setting": "speed": "channel": ""
②从MySQL的导入和导出
读取MySQL中的数据存放到HDFS
查看官方模板
[[email protected] job]$ python /opt/module/datax/bin/datax.py -r mysqlreader -w hdfswriter DataX (DATAX-OPENSOURCE-3.0), From Alibaba ! Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved. Please refer to the mysqlreader document: https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md Please refer to the hdfswriter document: https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md Please save the following configuration as a json file and use python DATAX_HOME/bin/datax.py JSON_FILE_NAME.json to run the job. "job": "content": [ "reader": "name": "mysqlreader", "parameter": "column": [], "connection": [ "jdbcUrl": [], "table": [] ], "password": "", "username": "", "where": "" , "writer": "name": "hdfswriter", "parameter": "column": [], "compress": "", "defaultFS": "", "fieldDelimiter": "", "fileName": "", "fileType": "", "path": "", "writeMode": "" ], "setting": "speed": "channel": ""
[[email protected] job]$ vim mysql2hdfs
"job": "content": [ "reader": "name": "mysqlreader", "parameter": "column": [ "id", "name" ], "connection": [ "jdbcUrl": [ "jdbc:mysql://hadoop101:3306/test" ], "table": [ "stu1" ] ], "username": "root", "password": "123456" , "writer": "name": "hdfswriter", "parameter": "column": [ "name": "id", "type": "INT" , "name": "name", "type": "STRING" ], "defaultFS": "hdfs://hadoop101:9000", "fieldDelimiter": "\\t", "fileName": "student.txt", "fileType": "text", "path": "/", "writeMode": "append" ], "setting": "speed": "channel": "2"
2019-07-14 09:02:55.924 [job-0] INFO JobContainer - [total cpu info] => averageCpu | maxDeltaCpu | minDeltaCpu -1.00% | -1.00% | -1.00% [total gc info] => NAME | totalGCCount | maxDeltaGCCount | minDeltaGCCount | totalGCTime | maxDeltaGCTime | minDeltaGCTime PS MarkSweep | 1 | 1 | 1 | 0.071s | 0.071s | 0.071s PS Scavenge | 1 | 1 | 1 | 0.072s | 0.072s | 0.072s 2019-07-14 09:02:55.924 [job-0] INFO JobContainer - PerfTrace not enable! 2019-07-14 09:02:55.925 [job-0] INFO StandAloneJobContainerCommunicator - Total 3 records, 30 bytes | Speed 3B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 100.00% 2019-07-14 09:02:55.927 [job-0] INFO JobContainer - 任务启动时刻 : 2019-07-14 09:02:43 任务结束时刻 : 2019-07-14 09:02:55 任务总计耗时 : 12s 任务平均流量 : 3B/s 记录写入速度 : 0rec/s 读出记录总数 : 3 读写失败总数 : 0
注意:HdfsWriter实际执行时会在该文件名后添加随机的后缀作为每个线程写入实际文件名。
[[email protected] job]$ python /opt/module/datax/bin/datax.py -r mongodbreader -w hdfswriter
[[email protected] job]$ python /opt/module/datax/bin/datax.py -r mongodbreader -w hdfswriter DataX (DATAX-OPENSOURCE-3.0), From Alibaba ! Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved. Please refer to the mongodbreader document: https://github.com/alibaba/DataX/blob/master/mongodbreader/doc/mongodbreader.md Please refer to the hdfswriter document: https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md Please save the following configuration as a json file and use python DATAX_HOME/bin/datax.py JSON_FILE_NAME.json to run the job. "job": "content": [ "reader": "name": "mongodbreader", "parameter": "address": [], "collectionName": "", "column": [], "dbName": "", "userName": "", "userPassword": "" , "writer": "name": "hdfswriter", "parameter": "column": [], "compress": "", "defaultFS": "", "fieldDelimiter": "", "fileName": "", "fileType": "", "path": "", "writeMode": "" ], "setting": "speed": "channel": ""
读取HDFS数据写入MySQL
将上个案例上传的文件改名
[[email protected] datax]$ hadoop fs -mv /student.txt* /student.txt
查看官方模板
[[email protected] datax]$ python bin/datax.py -r hdfsreader -w mysqlwriter
创建配置文件
"job": "content": [ "reader": "name": "hdfsreader", "parameter": "column": ["*"], "defaultFS": "hdfs://hadoop102:9000", "encoding": "UTF-8", "fieldDelimiter": "\\t", "fileType": "text", "path": "/student.txt" , "writer": "name": "mysqlwriter", "parameter": "column": [ "id", "name" ], "connection": [ "jdbcUrl": "jdbc:mysql://hadoop102:3306/datax", "table": ["student2"] ], "password": "000000", "username": "root", "writeMode": "insert" ], "setting": "speed": "channel": "1"
在MySQL的datax数据库中创建student2
mysql> use test;
mysql> create table stu1(id int,name varchar(20));
执行任务
[[email protected] datax]$ bin/datax.py job/hdfs2mysql.json
③DataX导入导出案例
读取MongoDB的数据导入到HDFS
编写配置文件
[[email protected] datax]$ vim job/mongdb2hdfs.json
"job": "content": [ "reader": "name": "mongodbreader", "parameter": "address": ["127.0.0.1:27017"], "collectionName": "atguigu", "column": [ "name":"name", "type":"string" , "name":"url", "type":"string" ], "dbName": "test", , "writer": "name": "hdfswriter", "parameter": "column": [ "name":"name", "type":"string" , "name":"url", "type":"string" ], "defaultFS": "hdfs://hadoop102:9000", "fieldDelimiter": "\\t", "fileName": "mongo.txt", "fileType": "text", "path": "/", "writeMode": "append" ], "setting": "speed": "channel": "1"
mongodbreader参数解析
address: MongoDB的数据地址信息,因为MonogDB可能是个集群,则ip端口信息需要以Json数组的形式给出。【必填】
userName:MongoDB的用户名。【选填】
userPassword: MongoDB的密码。【选填】
collectionName: MonogoDB的集合名。【必填】
column:MongoDB的文档列名。【必填】
name:Column的名字。【必填】
type:Column的类型。【选填】
splitter:因为MongoDB支持数组类型,但是Datax框架本身不支持数组类型,所以mongoDB读出来的数组类型要通过这个分隔符合并成字符串。【选填】
执行
[[email protected] datax]$ bin/datax.py job/mongdb2hdfs.json
读取MongoDB的数据导入MySQL
在MySQL中创建表 mysql> create table kris(name varchar(20),url varchar(20));
编写DataX配置文件
[[email protected] datax]$ vim job/mongodb2mysql.json
以上是关于DataX的主要内容,如果未能解决你的问题,请参考以下文章