DataX

Posted shengyang17

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了DataX相关的知识,希望对你有一定的参考价值。

 

1.什么是DataX

?       DataX 是阿里巴巴开源的一个异构数据源离线同步工具,致力于实现包括关系型数据库(mysql、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。

2. DataX的设计

为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。

                                       技术图片

 

3.  框架设计

                                 技术图片

 

DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。

Reader:数据采集模块,负责采集数据源的数据,将数据发送给Framework。

Writer:数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。

Framework:用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

4.  运行原理

                                     技术图片

1)  DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。

2)  DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。

3)  切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。

4)  每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。

5)  DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0

 5.快速入门

  前置要求
    - Linux
    - JDK(1.8以上,推荐1.8)
    - Python(推荐Python2.6.X)
   安装
    1)将下载好的datax.tar.gz上传到hadoop101的/opt/software
      [[email protected] software]$ ls
        datax.tar.gz
    2)解压datax.tar.gz到/opt/module
      [[email protected] software]$ tar -zxvf datax.tar.gz -C /opt/module/
    3)运行自检脚本
      [[email protected] bin]$ cd /opt/module/datax/bin/
      [[email protected] bin]$ python datax.py /opt/module/datax/job/job.json

[[email protected] bin]$ python datax.py /opt/module/datax/job/job.json 

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.


2019-07-14 07:51:25.661 [main] INFO  VMInfo - VMInfo# operatingSystem class => sun.management.OperatingSystemImpl
2019-07-14 07:51:25.676 [main] INFO  Engine - the machine info  => 

        osInfo: Oracle Corporation 1.8 25.144-b01
        jvmInfo:        Linux amd64 2.6.32-642.el6.x86_64
        cpu num:        8

        totalPhysicalMemory:    -0.00G
        freePhysicalMemory:     -0.00G
        maxFileDescriptorCount: -1
        currentOpenFileDescriptorCount: -1

        GC Names        [PS MarkSweep, PS Scavenge]

        MEMORY_NAME                    | allocation_size                | init_size                      
        PS Eden Space                  | 256.00MB                       | 256.00MB                       
        Code Cache                     | 240.00MB                       | 2.44MB                         
        Compressed Class Space         | 1,024.00MB                     | 0.00MB                         
        PS Survivor Space              | 42.50MB                        | 42.50MB                        
        PS Old Gen                     | 683.00MB                       | 683.00MB                       
        Metaspace                      | -0.00MB                        | 0.00MB                         


2019-07-14 07:51:25.708 [main] INFO  Engine - 

        "content":[
                
                        "reader":
                                "name":"streamreader",
                                "parameter":
                                        "column":[
                                                
                                                        "type":"string",
                                                        "value":"DataX"
                                                ,
                                                
                                                        "type":"long",
                                                        "value":19890604
                                                ,
                                                
                                                        "type":"date",
                                                        "value":"1989-06-04 00:00:00"
                                                ,
                                                
                                                        "type":"bool",
                                                        "value":true
                                                ,
                                                
                                                        "type":"bytes",
                                                        "value":"test"
                                                
                                        ],
                                        "sliceRecordCount":100000
                                
                        ,
                        "writer":
                                "name":"streamwriter",
                                "parameter":
                                        "encoding":"UTF-8",
                                        "print":false
                                
                        
                
        ],
        "setting":
                "errorLimit":
                        "percentage":0.02,
                        "record":0
                ,
                "speed":
                        "byte":10485760
                
        



2019-07-14 07:51:35.892 [job-0] INFO  HookInvoker - No hook invoked, because base dir not exists or is a file: /opt/module/datax/hook
2019-07-14 07:51:35.896 [job-0] INFO  JobContainer - 
         [total cpu info] => 
                averageCpu                     | maxDeltaCpu                    | minDeltaCpu                    
                -1.00%                         | -1.00%                         | -1.00%
                        

         [total gc info] => 
                 NAME                 | totalGCCount       | maxDeltaGCCount    | minDeltaGCCount    | totalGCTime        | maxDeltaGCTime     | minDeltaGCTime     
                 PS MarkSweep         | 0                  | 0                  | 0                  | 0.000s             | 0.000s             | 0.000s             
                 PS Scavenge          | 0                  | 0                  | 0                  | 0.000s             | 0.000s             | 0.000s             

2019-07-14 07:51:35.897 [job-0] INFO  JobContainer - PerfTrace not enable!
2019-07-14 07:51:35.898 [job-0] INFO  StandAloneJobContainerCommunicator - Total 100000 records, 2600000 bytes | Speed 253.91KB/s, 10000 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.018s |  All Task WaitReaderTime 0.047s | Percentage 100.00%
2019-07-14 07:51:35.899 [job-0] INFO  JobContainer - 
任务启动时刻                    : 2019-07-14 07:51:25
任务结束时刻                    : 2019-07-14 07:51:35
任务总计耗时                    :                 10s
任务平均流量                    :          253.91KB/s
记录写入速度                    :          10000rec/s
读出记录总数                    :              100000
读写失败总数                    :                   0

6. 使用案例

①从stream流读取数据并打印到控制台

1)查看配置模板

[[email protected] bin]$ python datax.py -r streamreader -w streamwriter
技术图片
DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.

Please refer to the streamreader document:
     https://github.com/alibaba/DataX/blob/master/streamreader/doc/streamreader.md 

Please refer to the streamwriter document:
     https://github.com/alibaba/DataX/blob/master/streamwriter/doc/streamwriter.md 
 
Please save the following configuration as a json file and  use
     python DATAX_HOME/bin/datax.py JSON_FILE_NAME.json 
to run the job.


    "job": 
        "content": [
            
                "reader": 
                    "name": "streamreader", 
                    "parameter": 
                        "column": [], 
                        "sliceRecordCount": ""
                    
                , 
                "writer": 
                    "name": "streamwriter", 
                    "parameter": 
                        "encoding": "", 
                        "print": true
                    
                
            
        ], 
        "setting": 
            "speed": 
                "channel": ""
            
        
    
View Code

 

②从MySQL的导入和导出

读取MySQL中的数据存放到HDFS

    查看官方模板

技术图片
[[email protected] job]$ python /opt/module/datax/bin/datax.py -r mysqlreader -w hdfswriter

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.


Please refer to the mysqlreader document:
     https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md 

Please refer to the hdfswriter document:
     https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md 
 
Please save the following configuration as a json file and  use
     python DATAX_HOME/bin/datax.py JSON_FILE_NAME.json 
to run the job.


    "job": 
        "content": [
            
                "reader": 
                    "name": "mysqlreader", 
                    "parameter": 
                        "column": [], 
                        "connection": [
                            
                                "jdbcUrl": [], 
                                "table": []
                            
                        ], 
                        "password": "", 
                        "username": "", 
                        "where": ""
                    
                , 
                "writer": 
                    "name": "hdfswriter", 
                    "parameter": 
                        "column": [], 
                        "compress": "", 
                        "defaultFS": "", 
                        "fieldDelimiter": "", 
                        "fileName": "", 
                        "fileType": "", 
                        "path": "", 
                        "writeMode": ""
                    
                
            
        ], 
        "setting": 
            "speed": 
                "channel": ""
            
        
    
View Code

技术图片

技术图片

 

[[email protected] job]$ vim mysql2hdfs 
技术图片

    "job": 
        "content": [
            
                "reader": 
                    "name": "mysqlreader", 
                    "parameter": 
                        "column": [
                            "id",
                            "name"
                        ], 
                        "connection": [
                            
                                "jdbcUrl": [
                                    "jdbc:mysql://hadoop101:3306/test"
                                ], 
                                "table": [
                                    "stu1"
                                ]
                            
                        ], 
                        "username": "root", 
                        "password": "123456"
                    
                , 
                "writer": 
                    "name": "hdfswriter", 
                    "parameter": 
                        "column": [
                            
                                "name": "id",
                                "type": "INT"
                            ,
                            
                                "name": "name",
                                "type": "STRING"
                            
                        ],  
                        "defaultFS": "hdfs://hadoop101:9000", 
                        "fieldDelimiter": "\\t", 
                        "fileName": "student.txt", 
                        "fileType": "text", 
                        "path": "/", 
                        "writeMode": "append"
                    
                
            
        ], 
        "setting": 
            "speed": 
                "channel": "2"
            
        
    
View Code

 

2019-07-14 09:02:55.924 [job-0] INFO  JobContainer - 
         [total cpu info] => 
                averageCpu                     | maxDeltaCpu                    | minDeltaCpu                    
                -1.00%                         | -1.00%                         | -1.00%
                        

         [total gc info] => 
                 NAME                 | totalGCCount       | maxDeltaGCCount    | minDeltaGCCount    | totalGCTime        | maxDeltaGCTime     | minDeltaGCTime     
                 PS MarkSweep         | 1                  | 1                  | 1                  | 0.071s             | 0.071s             | 0.071s             
                 PS Scavenge          | 1                  | 1                  | 1                  | 0.072s             | 0.072s             | 0.072s             

2019-07-14 09:02:55.924 [job-0] INFO  JobContainer - PerfTrace not enable!
2019-07-14 09:02:55.925 [job-0] INFO  StandAloneJobContainerCommunicator - Total 3 records, 30 bytes | Speed 3B/s, 0 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.000s | Percentage 100.00%
2019-07-14 09:02:55.927 [job-0] INFO  JobContainer - 
任务启动时刻                    : 2019-07-14 09:02:43
任务结束时刻                    : 2019-07-14 09:02:55
任务总计耗时                    :                 12s
任务平均流量                    :                3B/s
记录写入速度                    :              0rec/s
读出记录总数                    :                   3
读写失败总数                    :                   0

 

 

技术图片

注意:HdfsWriter实际执行时会在该文件名后添加随机的后缀作为每个线程写入实际文件名。

 

[[email protected] job]$ python /opt/module/datax/bin/datax.py -r mongodbreader -w hdfswriter  
技术图片
[[email protected] job]$ python /opt/module/datax/bin/datax.py -r mongodbreader -w hdfswriter     

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.


Please refer to the mongodbreader document:
     https://github.com/alibaba/DataX/blob/master/mongodbreader/doc/mongodbreader.md 

Please refer to the hdfswriter document:
     https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md 
 
Please save the following configuration as a json file and  use
     python DATAX_HOME/bin/datax.py JSON_FILE_NAME.json 
to run the job.


    "job": 
        "content": [
            
                "reader": 
                    "name": "mongodbreader", 
                    "parameter": 
                        "address": [], 
                        "collectionName": "", 
                        "column": [], 
                        "dbName": "", 
                        "userName": "", 
                        "userPassword": ""
                    
                , 
                "writer": 
                    "name": "hdfswriter", 
                    "parameter": 
                        "column": [], 
                        "compress": "", 
                        "defaultFS": "", 
                        "fieldDelimiter": "", 
                        "fileName": "", 
                        "fileType": "", 
                        "path": "", 
                        "writeMode": ""
                    
                
            
        ], 
        "setting": 
            "speed": 
                "channel": ""
            
        
    
View Code

读取HDFS数据写入MySQL

将上个案例上传的文件改名

  [[email protected] datax]$ hadoop fs -mv /student.txt* /student.txt

查看官方模板

  [[email protected] datax]$ python bin/datax.py -r hdfsreader -w mysqlwriter

创建配置文件

技术图片

    "job": 
        "content": [
            
                "reader": 
                    "name": "hdfsreader", 
                    "parameter": 
                        "column": ["*"], 
                        "defaultFS": "hdfs://hadoop102:9000", 
                        "encoding": "UTF-8", 
                        "fieldDelimiter": "\\t", 
                        "fileType": "text", 
                        "path": "/student.txt"
                    
                , 
                "writer": 
                    "name": "mysqlwriter", 
                    "parameter": 
                        "column": [
                            "id",
                            "name"
                        ], 
                        "connection": [
                            
                                "jdbcUrl": "jdbc:mysql://hadoop102:3306/datax", 
                                "table": ["student2"]
                            
                        ], 
                        "password": "000000", 
                        "username": "root", 
                        "writeMode": "insert"
                    
                
            
        ], 
        "setting": 
            "speed": 
                "channel": "1"
            
        
    
View Code

在MySQL的datax数据库中创建student2

  mysql> use test;

  mysql> create table stu1(id int,name varchar(20));

执行任务

  [[email protected] datax]$ bin/datax.py job/hdfs2mysql.json

③DataX导入导出案例

读取MongoDB的数据导入到HDFS

编写配置文件

[[email protected] datax]$ vim job/mongdb2hdfs.json
技术图片

    "job": 
        "content": [
            
                "reader": 
                    "name": "mongodbreader", 
                    "parameter": 
                        "address": ["127.0.0.1:27017"], 
                        "collectionName": "atguigu", 
                        "column": [
                            
                                "name":"name",
                                "type":"string"
                            ,
                            
                                "name":"url",
                                "type":"string"
                            
                        ], 
                        "dbName": "test", 
                    
                , 
                "writer": 
                    "name": "hdfswriter", 
                    "parameter": 
                        "column": [
                            
                                "name":"name",
                                "type":"string"
                            ,
                            
                                "name":"url",
                                "type":"string"
                            
                        ], 
                        "defaultFS": "hdfs://hadoop102:9000", 
                        "fieldDelimiter": "\\t", 
                        "fileName": "mongo.txt", 
                        "fileType": "text", 
                        "path": "/", 
                        "writeMode": "append"
                    
                
            
        ], 
        "setting": 
            "speed": 
                "channel": "1"
            
        
    
View Code

mongodbreader参数解析

  address: MongoDB的数据地址信息,因为MonogDB可能是个集群,则ip端口信息需要以Json数组的形式给出。【必填】

  userName:MongoDB的用户名。【选填】

  userPassword: MongoDB的密码。【选填】

  collectionName: MonogoDB的集合名。【必填】

  column:MongoDB的文档列名。【必填】

  name:Column的名字。【必填】

  type:Column的类型。【选填】

  splitter:因为MongoDB支持数组类型,但是Datax框架本身不支持数组类型,所以mongoDB读出来的数组类型要通过这个分隔符合并成字符串。【选填】

执行
[[email protected] datax]$ bin/datax.py job/mongdb2hdfs.json

读取MongoDB的数据导入MySQL

  在MySQL中创建表 mysql> create table kris(name varchar(20),url varchar(20));

编写DataX配置文件

  [[email protected] datax]$ vim job/mongodb2mysql.json

 

以上是关于DataX的主要内容,如果未能解决你的问题,请参考以下文章

Datax官方笔记总结

DataX 快速入门

DataX 快速入门

DataX及DataX-Web

阿里异构离线数据同步工具/平台DataX