关于数据同步工具DataX部署

Posted DB架构

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了关于数据同步工具DataX部署相关的知识,希望对你有一定的参考价值。

1.DataX简介

1.1 DataX概述

DataX 是阿里巴巴开源的一个异构数据源离线同步工具,致力于实现包括关系型数据库(mysql、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。

源码地址:GitHub - alibaba/DataX: DataX是阿里云DataWorks数据集成的开源版本。

1.2 DataX支持的数据源

 DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入,目前支持数据如下图。

 

类型

数据源

Reader(读)

Writer(写)

RDBMS 关系型数据库

MySQL

Oracle

OceanBase

SQLServer

PostgreSQL

DRDS

通用RDBMS

阿里云数仓数据存储

ODPS

ADS

OSS

OCS

NoSQL数据存储

OTS

Hbase0.94

Hbase1.1

Phoenix4.x

Phoenix5.x

MongoDB

Hive

Cassandra

无结构化数据存储

TxtFile

FTP

HDFS

Elasticsearch

时间序列数据库

OpenTSDB

TSDB

2. DataX 架构原理

2.1 DataX设计理念

 为了解决异构数据源同步问题DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。

 

2.2 DataX框架设计

DataX本身作为离线数据同步框架采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。

 2.3 DataX运行流程

下面用一个DataX作业生命周期的时序图说明DataX的运行流程、核心概念以及每个概念之间的关系。

 

 2.4 DataX调度决策思路

举例来说,用户提交了一个DataX作业,并且配置了总的并发度为20,目的是对一个有100张分表的mysql数据源进行同步。DataX的调度决策思路是:

1)DataX Job根据分库分表切分策略,将同步工作分成100个Task。

2)根据配置的总的并发度20,以及每个Task Group的并发度5,DataX计算共需要分配4个TaskGroup。

3)4个TaskGroup平分100个Task,每一个TaskGroup负责运行25个Task。

2.5 DataX与Sqoop对比

功能

DataX

Sqoop

运行模式

单进程多线程

MR

分布式

不支持,可以通过调度系统规避

支持

流控

有流控功能

需要定制

统计信息

已有一些统计,上报需定制

没有,分布式的数据收集不方便

数据校验

core部分有校验功能

没有,分布式的数据收集不方便

监控

需要定制

需要定制

3. DataX 部署

1)下载DataX安装包并上传到hadoop102的/opt/software

 

下载地址:http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz

2)解压datax.tar.gz到/opt/module

[maxwell@hadoop102 software]$ tar -zxvf datax.tar.gz -C /opt/module/

 3)自检,执行如下命令

[maxwell@hadoop102 ~]$ python /opt/module/datax/bin/datax.py /opt/module/datax/job/job.json

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.


2023-03-28 12:56:58.652 [main] INFO  VMInfo - VMInfo# operatingSystem class => sun.management.OperatingSystemImpl
2023-03-28 12:56:58.669 [main] INFO  Engine - the machine info  => 

        osInfo: Oracle Corporation 1.8 25.212-b10
        jvmInfo:        Linux amd64 3.10.0-862.el7.x86_64
        cpu num:        2

        totalPhysicalMemory:    -0.00G
        freePhysicalMemory:     -0.00G
        maxFileDescriptorCount: -1
        currentOpenFileDescriptorCount: -1

        GC Names        [PS MarkSweep, PS Scavenge]

        MEMORY_NAME                    | allocation_size                | init_size                      
        PS Eden Space                  | 256.00MB                       | 256.00MB                       
        Code Cache                     | 240.00MB                       | 2.44MB                         
        Compressed Class Space         | 1,024.00MB                     | 0.00MB                         
        PS Survivor Space              | 42.50MB                        | 42.50MB                        
        PS Old Gen                     | 683.00MB                       | 683.00MB                       
        Metaspace                      | -0.00MB                        | 0.00MB                         


2023-03-28 12:56:58.712 [main] INFO  Engine - 

        "content":[
                
                        "reader":
                                "name":"streamreader",
                                "parameter":
                                        "column":[
                                                
                                                        "type":"string",
                                                        "value":"DataX"
                                                ,
                                                
                                                        "type":"long",
                                                        "value":19890604
                                                ,
                                                
                                                        "type":"date",
                                                        "value":"1989-06-04 00:00:00"
                                                ,
                                                
                                                        "type":"bool",
                                                        "value":true
                                                ,
                                                
                                                        "type":"bytes",
                                                        "value":"test"
                                                
                                        ],
                                        "sliceRecordCount":100000
                                
                        ,
                        "writer":
                                "name":"streamwriter",
                                "parameter":
                                        "encoding":"UTF-8",
                                        "print":false
                                
                        
                
        ],
        "setting":
                "errorLimit":
                        "percentage":0.02,
                        "record":0
                ,
                "speed":
                        "byte":10485760
                
        


2023-03-28 12:56:58.775 [main] WARN  Engine - prioriy set to 0, because NumberFormatException, the value is: null
2023-03-28 12:56:58.777 [main] INFO  PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=0
2023-03-28 12:56:58.777 [main] INFO  JobContainer - DataX jobContainer starts job.
2023-03-28 12:56:58.780 [main] INFO  JobContainer - Set jobId = 0
2023-03-28 12:56:58.860 [job-0] INFO  JobContainer - jobContainer starts to do prepare ...
2023-03-28 12:56:58.861 [job-0] INFO  JobContainer - DataX Reader.Job [streamreader] do prepare work .
2023-03-28 12:56:58.861 [job-0] INFO  JobContainer - DataX Writer.Job [streamwriter] do prepare work .
2023-03-28 12:56:58.861 [job-0] INFO  JobContainer - jobContainer starts to do split ...
2023-03-28 12:56:58.873 [job-0] INFO  JobContainer - Job set Max-Byte-Speed to 10485760 bytes.
2023-03-28 12:56:58.874 [job-0] INFO  JobContainer - DataX Reader.Job [streamreader] splits to [1] tasks.
2023-03-28 12:56:58.874 [job-0] INFO  JobContainer - DataX Writer.Job [streamwriter] splits to [1] tasks.
2023-03-28 12:56:58.908 [job-0] INFO  JobContainer - jobContainer starts to do schedule ...
2023-03-28 12:56:58.911 [job-0] INFO  JobContainer - Scheduler starts [1] taskGroups.
2023-03-28 12:56:58.919 [job-0] INFO  JobContainer - Running by standalone Mode.
2023-03-28 12:56:58.969 [taskGroup-0] INFO  TaskGroupContainer - taskGroupId=[0] start [1] channels for [1] tasks.
2023-03-28 12:56:59.015 [taskGroup-0] INFO  Channel - Channel set byte_speed_limit to -1, No bps activated.
2023-03-28 12:56:59.015 [taskGroup-0] INFO  Channel - Channel set record_speed_limit to -1, No tps activated.
2023-03-28 12:56:59.051 [taskGroup-0] INFO  TaskGroupContainer - taskGroup[0] taskId[0] attemptCount[1] is started
2023-03-28 12:56:59.152 [taskGroup-0] INFO  TaskGroupContainer - taskGroup[0] taskId[0] is successed, used[106]ms
2023-03-28 12:56:59.153 [taskGroup-0] INFO  TaskGroupContainer - taskGroup[0] completed it's tasks.
2023-03-28 12:57:09.074 [job-0] INFO  StandAloneJobContainerCommunicator - Total 100000 records, 2600000 bytes | Speed 253.91KB/s, 10000 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.041s |  All Task WaitReaderTime 0.060s | Percentage 100.00%
2023-03-28 12:57:09.074 [job-0] INFO  AbstractScheduler - Scheduler accomplished all tasks.
2023-03-28 12:57:09.074 [job-0] INFO  JobContainer - DataX Writer.Job [streamwriter] do post work.
2023-03-28 12:57:09.075 [job-0] INFO  JobContainer - DataX Reader.Job [streamreader] do post work.
2023-03-28 12:57:09.075 [job-0] INFO  JobContainer - DataX jobId [0] completed successfully.
2023-03-28 12:57:09.079 [job-0] INFO  HookInvoker - No hook invoked, because base dir not exists or is a file: /opt/module/datax/hook
2023-03-28 12:57:09.080 [job-0] INFO  JobContainer - 
         [total cpu info] => 
                averageCpu                     | maxDeltaCpu                    | minDeltaCpu                    
                -1.00%                         | -1.00%                         | -1.00%
                        

         [total gc info] => 
                 NAME                 | totalGCCount       | maxDeltaGCCount    | minDeltaGCCount    | totalGCTime        | maxDeltaGCTime     | minDeltaGCTime     
                 PS MarkSweep         | 0                  | 0                  | 0                  | 0.000s             | 0.000s             | 0.000s             
                 PS Scavenge          | 0                  | 0                  | 0                  | 0.000s             | 0.000s             | 0.000s             

2023-03-28 12:57:09.081 [job-0] INFO  JobContainer - PerfTrace not enable!
2023-03-28 12:57:09.081 [job-0] INFO  StandAloneJobContainerCommunicator - Total 100000 records, 2600000 bytes | Speed 253.91KB/s, 10000 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.041s |  All Task WaitReaderTime 0.060s | Percentage 100.00%
2023-03-28 12:57:09.082 [job-0] INFO  JobContainer - 
任务启动时刻                    : 2023-03-28 12:56:58
任务结束时刻                    : 2023-03-28 12:57:09
任务总计耗时                    :                 10s
任务平均流量                    :          253.91KB/s
记录写入速度                    :          10000rec/s
读出记录总数                    :              100000
读写失败总数                    :                   0

[maxwell@hadoop102 ~]$ 

上述执行结果表明执行成功。

4. DataX使用

4.1 DataX使用概述

4.1.1 DataX任务提交命令

DataX的使用十分简单,用户只需根据自己同步数据的数据源和目的地选择相应的Reader和Writer,并将Reader和Writer的信息配置在一个json文件中,然后执行如下命令提交数据同步任务即可。

[maxwell@hadoop102 datax]$ python bin/datax.py path/to/your/job.json

4.2.2 DataX配置文件格式

可以使用如下命名查看DataX配置文件模板。

[maxwell@hadoop102 datax]$ python bin/datax.py -r mysqlreader -w hdfswriter

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.


Please refer to the mysqlreader document:
     https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md 

Please refer to the hdfswriter document:
     https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md 
 
Please save the following configuration as a json file and  use
     python DATAX_HOME/bin/datax.py JSON_FILE_NAME.json 
to run the job.


    "job": 
        "content": [
            
                "reader": 
                    "name": "mysqlreader", 
                    "parameter": 
                        "column": [], 
                        "connection": [
                            
                                "jdbcUrl": [], 
                                "table": []
                            
                        ], 
                        "password": "", 
                        "username": "", 
                        "where": ""
                    
                , 
                "writer": 
                    "name": "hdfswriter", 
                    "parameter": 
                        "column": [], 
                        "compress": "", 
                        "defaultFS": "", 
                        "fieldDelimiter": "", 
                        "fileName": "", 
                        "fileType": "", 
                        "path": "", 
                        "writeMode": ""
                    
                
            
        ], 
        "setting": 
            "speed": 
                "channel": ""
            
        
    

[maxwell@hadoop102 datax]$

配置文件模板如下,json最外层是一个job,job包含setting和content两部分,其中setting用于对整个job进行配置,content用户配置数据源和目的地。

Reader和Writer的具体参数可参考官方文档,地址如下:

DataX/README.md at master · alibaba/DataX · GitHub

类型数据源Reader(读)Writer(写)文档
RDBMS 关系型数据库MySQL 、
Oracle 、
OceanBase 、
SQLServer 、
PostgreSQL 、
DRDS 、
Kingbase 、
通用RDBMS(支持所有关系型数据库) 、
阿里云数仓数据存储ODPS 、
ADB
ADS
OSS 、
OCS
Hologres
AnalyticDB For PostgreSQL
阿里云中间件datahub读 、写
SLS读 、写
阿里云图数据库GDB 、
NoSQL数据存储OTS 、
Hbase0.94 、
Hbase1.1 、
Phoenix4.x 、
Phoenix5.x 、
MongoDB 、
Cassandra 、
数仓数据存储StarRocks读 、
ApacheDoris
ClickHouse
Databend
Hive 、
kudu
selectdb
无结构化数据存储TxtFile 、
FTP 、
HDFS 、
Elasticsearch
时间序列数据库OpenTSDB
TSDB 、
TDengine 、

 

 4.2 同步MySQL数据到HDFS案例

案例要求:同步gmall数据库中base_province表数据到HDFS的/base_province目录

需求分析:要实现该功能,需选用MySQLReader和HDFSWriter,MySQLReader具有两种模式分别是TableMode和QuerySQLMode,前者使用table,column,where等属性声明需要同步的数据;后者使用一条SQL查询语句声明需要同步的数据。

4.2.1 MySQLReader之TableMode

1)编写配置文件

(1)创建配置文件base_province.json

[maxwell@hadoop102 ~]$ vim /opt/module/datax/job/base_province.json

(2)配置文件内容如下


    "job": 
        "content": [
            
                "reader": 
                    "name": "mysqlreader",
                    "parameter": 
                        "column": [
                            "id",
                            "name",
                            "region_id",
                            "area_code",
                            "iso_code",
                            "iso_3166_2"
                        ],
                        "where": "id>=3",
                        "connection": [
                            
                                "jdbcUrl": [
                                    "jdbc:mysql://hadoop102:3306/gmall"
                                ],
                                "table": [
                                    "base_province"
                                ]
                            
                        ],
                        "password": "XXXXXXX",
                        "splitPk": "",
                        "username": "root"
                    
                ,
                "writer": 
                    "name": "hdfswriter",
                    "parameter": 
                        "column": [
                            
                                "name": "id",
                                "type": "bigint"
                            ,
                            
                                "name": "name",
                                "type": "string"
                            ,
                            
                                "name": "region_id",
                                "type": "string"
                            ,
                            
                                "name": "area_code",
                                "type": "string"
                            ,
                            
                                "name": "iso_code",
                                "type": "string"
                            ,
                            
                                "name": "iso_3166_2",
                                "type": "string"
                            
                        ],
                        "compress": "gzip",
                        "defaultFS": "hdfs://hadoop102:8020",
                        "fieldDelimiter": "\\t",
                        "fileName": "base_province",
                        "fileType": "text",
                        "path": "/base_province",
                        "writeMode": "append"
                    
                
            
        ],
        "setting": 
            "speed": 
                "channel": 1
            
        
    

2)配置文件说明

(1)Reader参数说明

 

(2)Writer参数说明

 

注意事项:

HFDS Writer并未提供nullFormat参数:也就是用户并不能自定义null值写到HFDS文件中的存储格式。默认情况下,HFDS Writer会将null值存储为空字符串(''),而Hive默认的null值存储格式为\\N。所以后期将DataX同步的文件导入Hive表就会出现问题。

解决该问题的方案有两个:

一是修改DataX HDFS Writer的源码,增加自定义null值存储格式的逻辑,可参考记Datax3.0解决MySQL抽数到HDFSNULL变为空字符的问题_datax nullformat_谭正强的博客-CSDN博客

二是在Hive中建表时指定null值存储格式为空字符串(''),例如:

DROP TABLE IF EXISTS base_province;
CREATE EXTERNAL TABLE base_province
(
    `id`         STRING COMMENT '编号',
    `name`       STRING COMMENT '省份名称',
    `region_id`  STRING COMMENT '地区ID',
    `area_code`  STRING COMMENT '地区编码',
    `iso_code`   STRING COMMENT '旧版ISO-3166-2编码,供可视化使用',
    `iso_3166_2` STRING COMMENT '新版ios-3166-2编码,供可视化使用'
) COMMENT '省份表'
    ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t'
    NULL DEFINED AS ''
    LOCATION '/base_province/';

(3)Setting参数说明

 3)提交任务

(1)在HDFS创建/base_province目录

使用DataX向HDFS同步数据时,需确保目标路径已存在

[maxwell@hadoop102 datax]$ hadoop fs -mkdir /base_province

(2)进入DataX根目录

[maxwell@hadoop102 datax]$ cd /opt/module/datax 

(3)执行如下命令

[maxwell@hadoop102 datax]$ python bin/datax.py job/base_province.json

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.


2023-03-28 14:13:09.610 [main] INFO  VMInfo - VMInfo# operatingSystem class => sun.management.OperatingSystemImpl
2023-03-28 14:13:09.615 [main] INFO  Engine - the machine info  => 

        osInfo: Oracle Corporation 1.8 25.212-b10
        jvmInfo:        Linux amd64 3.10.0-862.el7.x86_64
        cpu num:        2

        totalPhysicalMemory:    -0.00G
        freePhysicalMemory:     -0.00G
        maxFileDescriptorCount: -1
        currentOpenFileDescriptorCount: -1

        GC Names        [PS MarkSweep, PS Scavenge]

        MEMORY_NAME                    | allocation_size                | init_size                      
        PS Eden Space                  | 256.00MB                       | 256.00MB                       
        Code Cache                     | 240.00MB                       | 2.44MB                         
        Compressed Class Space         | 1,024.00MB                     | 0.00MB                         
        PS Survivor Space              | 42.50MB                        | 42.50MB                        
        PS Old Gen                     | 683.00MB                       | 683.00MB                       
        Metaspace                      | -0.00MB                        | 0.00MB                         


2023-03-28 14:13:09.632 [main] INFO  Engine - 

        "content":[
                
                        "reader":
                                "name":"mysqlreader",
                                "parameter":
                                        "column":[
                                                "id",
                                                "name",
                                                "region_id",
                                                "area_code",
                                                "iso_code",
                                                "iso_3166_2"
                                        ],
                                        "connection":[
                                                
                                                        "jdbcUrl":[
                                                                "jdbc:mysql://hadoop102:3306/gmall"
                                                        ],
                                                        "table":[
                                                                "base_province"
                                                        ]
                                                
                                        ],
                                        "password":"*********",
                                        "splitPk":"",
                                        "username":"root",
                                        "where":"id>=3"
                                
                        ,
                        "writer":
                                "name":"hdfswriter",
                                "parameter":
                                        "column":[
                                                
                                                        "name":"id",
                                                        "type":"bigint"
                                                ,
                                                
                                                        "name":"name",
                                                        "type":"string"
                                                ,
                                                
                                                        "name":"region_id",
                                                        "type":"string"
                                                ,
                                                
                                                        "name":"area_code",
                                                        "type":"string"
                                                ,
                                                
                                                        "name":"iso_code",
                                                        "type":"string"
                                                ,
                                                
                                                        "name":"iso_3166_2",
                                                        "type":"string"
                                                
                                        ],
                                        "compress":"gzip",
                                        "defaultFS":"hdfs://hadoop102:8020",
                                        "fieldDelimiter":"\\t",
                                        "fileName":"base_province",
                                        "fileType":"text",
                                        "path":"/base_province",
                                        "writeMode":"append"
                                
                        
                
        ],
        "setting":
                "speed":
                        "channel":1
                
        


2023-03-28 14:13:09.650 [main] WARN  Engine - prioriy set to 0, because NumberFormatException, the value is: null
2023-03-28 14:13:09.652 [main] INFO  PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=0
2023-03-28 14:13:09.652 [main] INFO  JobContainer - DataX jobContainer starts job.
2023-03-28 14:13:09.654 [main] INFO  JobContainer - Set jobId = 0
2023-03-28 14:13:09.978 [job-0] INFO  OriginalConfPretreatmentUtil - Available jdbcUrl:jdbc:mysql://hadoop102:3306/gmall?yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true.
2023-03-28 14:13:09.996 [job-0] INFO  OriginalConfPretreatmentUtil - table:[base_province] has columns:[id,name,region_id,area_code,iso_code,iso_3166_2].
Mar 28, 2023 2:13:10 PM org.apache.hadoop.util.NativeCodeLoader <clinit>
WARNING: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2023-03-28 14:13:11.825 [job-0] INFO  JobContainer - jobContainer starts to do prepare ...
2023-03-28 14:13:11.825 [job-0] INFO  JobContainer - DataX Reader.Job [mysqlreader] do prepare work .
2023-03-28 14:13:11.825 [job-0] INFO  JobContainer - DataX Writer.Job [hdfswriter] do prepare work .
2023-03-28 14:13:12.047 [job-0] INFO  HdfsWriter$Job - 由于您配置了writeMode append, 写入前不做清理工作, [/base_province] 目录下写入相应文件名前缀  [base_province] 的文件
2023-03-28 14:13:12.047 [job-0] INFO  JobContainer - jobContainer starts to do split ...
2023-03-28 14:13:12.047 [job-0] INFO  JobContainer - Job set Channel-Number to 1 channels.
2023-03-28 14:13:12.062 [job-0] INFO  JobContainer - DataX Reader.Job [mysqlreader] splits to [1] tasks.
2023-03-28 14:13:12.062 [job-0] INFO  HdfsWriter$Job - begin do split...
2023-03-28 14:13:12.075 [job-0] INFO  HdfsWriter$Job - splited write file name:[hdfs://hadoop102:8020/base_province__599ea3d1_6d79_44aa_9f44_4148f782a4f8/base_province__564114a7_fd6b_4598_a234_460255d27677]
2023-03-28 14:13:12.075 [job-0] INFO  HdfsWriter$Job - end do split.
2023-03-28 14:13:12.075 [job-0] INFO  JobContainer - DataX Writer.Job [hdfswriter] splits to [1] tasks.
2023-03-28 14:13:12.130 [job-0] INFO  JobContainer - jobContainer starts to do schedule ...
2023-03-28 14:13:12.194 [job-0] INFO  JobContainer - Scheduler starts [1] taskGroups.
2023-03-28 14:13:12.197 [job-0] INFO  JobContainer - Running by standalone Mode.
2023-03-28 14:13:12.224 [taskGroup-0] INFO  TaskGroupContainer - taskGroupId=[0] start [1] channels for [1] tasks.
2023-03-28 14:13:12.256 [taskGroup-0] INFO  Channel - Channel set byte_speed_limit to -1, No bps activated.
2023-03-28 14:13:12.256 [taskGroup-0] INFO  Channel - Channel set record_speed_limit to -1, No tps activated.
2023-03-28 14:13:12.296 [taskGroup-0] INFO  TaskGroupContainer - taskGroup[0] taskId[0] attemptCount[1] is started
2023-03-28 14:13:12.335 [0-0-0-reader] INFO  CommonRdbmsReader$Task - Begin to read record by Sql: [select id,name,region_id,area_code,iso_code,iso_3166_2 from base_province where (id>=3)
] jdbcUrl:[jdbc:mysql://hadoop102:3306/gmall?yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true].
2023-03-28 14:13:12.344 [0-0-0-writer] INFO  HdfsWriter$Task - begin do write...
2023-03-28 14:13:12.344 [0-0-0-writer] INFO  HdfsWriter$Task - write to file : [hdfs://hadoop102:8020/base_province__599ea3d1_6d79_44aa_9f44_4148f782a4f8/base_province__564114a7_fd6b_4598_a234_460255d27677]
2023-03-28 14:13:12.516 [0-0-0-reader] INFO  CommonRdbmsReader$Task - Finished read record by Sql: [select id,name,region_id,area_code,iso_code,iso_3166_2 from base_province where (id>=3)
] jdbcUrl:[jdbc:mysql://hadoop102:3306/gmall?yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true].
2023-03-28 14:13:13.387 [0-0-0-writer] INFO  HdfsWriter$Task - end do write
2023-03-28 14:13:13.461 [taskGroup-0] INFO  TaskGroupContainer - taskGroup[0] taskId[0] is successed, used[1179]ms
2023-03-28 14:13:13.462 [taskGroup-0] INFO  TaskGroupContainer - taskGroup[0] completed it's tasks.
2023-03-28 14:13:22.266 [job-0] INFO  StandAloneJobContainerCommunicator - Total 32 records, 667 bytes | Speed 66B/s, 3 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.000s | Percentage 100.00%
2023-03-28 14:13:22.266 [job-0] INFO  AbstractScheduler - Scheduler accomplished all tasks.
2023-03-28 14:13:22.267 [job-0] INFO  JobContainer - DataX Writer.Job [hdfswriter] do post work.
2023-03-28 14:13:22.267 [job-0] INFO  HdfsWriter$Job - start rename file [hdfs://hadoop102:8020/base_province__599ea3d1_6d79_44aa_9f44_4148f782a4f8/base_province__564114a7_fd6b_4598_a234_460255d27677.gz] to file [hdfs://hadoop102:8020/base_province/base_province__564114a7_fd6b_4598_a234_460255d27677.gz].
2023-03-28 14:13:22.317 [job-0] INFO  HdfsWriter$Job - finish rename file [hdfs://hadoop102:8020/base_province__599ea3d1_6d79_44aa_9f44_4148f782a4f8/base_province__564114a7_fd6b_4598_a234_460255d27677.gz] to file [hdfs://hadoop102:8020/base_province/base_province__564114a7_fd6b_4598_a234_460255d27677.gz].
2023-03-28 14:13:22.318 [job-0] INFO  HdfsWriter$Job - start delete tmp dir [hdfs://hadoop102:8020/base_province__599ea3d1_6d79_44aa_9f44_4148f782a4f8] .
2023-03-28 14:13:22.402 [job-0] INFO  HdfsWriter$Job - finish delete tmp dir [hdfs://hadoop102:8020/base_province__599ea3d1_6d79_44aa_9f44_4148f782a4f8] .
2023-03-28 14:13:22.402 [job-0] INFO  JobContainer - DataX Reader.Job [mysqlreader] do post work.
2023-03-28 14:13:22.402 [job-0] INFO  JobContainer - DataX jobId [0] completed successfully.
2023-03-28 14:13:22.403 [job-0] INFO  HookInvoker - No hook invoked, because base dir not exists or is a file: /opt/module/datax/hook
2023-03-28 14:13:22.505 [job-0] INFO  JobContainer - 
         [total cpu info] => 
                averageCpu                     | maxDeltaCpu                    | minDeltaCpu                    
                -1.00%                         | -1.00%                         | -1.00%
                        

         [total gc info] => 
                 NAME                 | totalGCCount       | maxDeltaGCCount    | minDeltaGCCount    | totalGCTime        | maxDeltaGCTime     | minDeltaGCTime     
                 PS MarkSweep         | 1                  | 1                  | 1                  | 0.039s             | 0.039s             | 0.039s             
                 PS Scavenge          | 1                  | 1                  | 1                  | 0.021s             | 0.021s             | 0.021s             

2023-03-28 14:13:22.505 [job-0] INFO  JobContainer - PerfTrace not enable!
2023-03-28 14:13:22.506 [job-0] INFO  StandAloneJobContainerCommunicator - Total 32 records, 667 bytes | Speed 66B/s, 3 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.000s | Percentage 100.00%
2023-03-28 14:13:22.507 [job-0] INFO  JobContainer - 
任务启动时刻                    : 2023-03-28 14:13:09
任务结束时刻                    : 2023-03-28 14:13:22
任务总计耗时                    :                 12s
任务平均流量                    :               66B/s
记录写入速度                    :              3rec/s
读出记录总数                    :                  32
读写失败总数                    :                   0

[maxwell@hadoop102 datax]$ 

4)查看结果

(1)DataX打印日志

(2)查看HDFS文件

[maxwell@hadoop102 datax]$ hadoop fs -cat /base_province/* | zcat
2023-03-28 14:15:50,686 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
3       山西    1       140000  CN-14   CN-SX
4       内蒙古  1       150000  CN-15   CN-NM
5       河北    1       130000  CN-13   CN-HE
6       上海    2       310000  CN-31   CN-SH
7       江苏    2       320000  CN-32   CN-JS
8       浙江    2       330000  CN-33   CN-ZJ
9       安徽    2       340000  CN-34   CN-AH
10      福建    2       350000  CN-35   CN-FJ
11      江西    2       360000  CN-36   CN-JX
12      山东    2       370000  CN-37   CN-SD
14  

异构数据源离线同步工具之DataX的安装部署

异构数据源离线同步工具之DataX的安装部署

🍅程序员小王的博客:程序员小王的博客

🍅 欢迎点赞 👍 收藏 ⭐留言 📝

🍅 如有编辑错误联系作者,如果有比较好的文章欢迎分享给我,我会取其精华去其糟粕

🍅java自学的学习路线:java自学的学习路线

🍅该博客参考文献:阿里云DataX,DataX官网,尚硅谷大数据研究院

一、DataX概述

1、什么是DataX

DataX是阿里巴巴开源的一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle,DB2 等)、HDFS、Hive、ODPS、HBase、FTP 等各种异构数据源之间稳定高效的数据同步功能。在阿里巴巴集团内被广泛使用的离线数据同步工具

2、DataX的设计

   为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源,当需要接入一个新的数据源的时候,只需要将数据源对接到DataX,便能跟已有的数据源做到无缝数据同步了!

3、支持的数据源

DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入,目前支持数据如下图,详情请点击:DataX数据源参考指南

类型数据源Reader(读)Writer(写)文档
RDBMS 关系型数据库MySQL
Oracle
OceanBase
SQLServer
PostgreSQL
DRDS
通用RDBMS(支持所有关系型数据库)
阿里云数仓数据存储ODPS
ADS
OSS
OCS
NoSQL数据存储OTS
Hbase0.94
Hbase1.1
Phoenix4.x
Phoenix5.x
MongoDB
Hive
Cassandra
无结构化数据存储TxtFile
FTP
HDFS
Elasticsearch
时间序列数据库OpenTSDB
TSDB

4、DataX框架设计

  • **Reader:**数据采集模块,负责采集数据源的数据,将数据发送给FrameWork

  • Writer:数据写入模块,负责不断向FrameWork取数据,并将数据写入到目的端

  • FrameWork:用于连接Reader,writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题

5、运行原理

(1)运行原理

  • Job:单个作业的管理节点,负责数据清洗,子任务划分,TaskGroup监控管理

  • **Task:**由Job切分而来,是DataX作业的最小单元,每个Task负责一部分数据的同步工作

  • Schedule(计划表):将Task组成TaskGroup,单个TaskGroup的并发量为5

  • **TaskGroup:**负责启动Task

(2)举例:

用户提交了一个DataX作业,并且配置20个并发,目的是将一个100张分表的mysql数据同步到ods里面。

DataX的调度决策思路是:

  • DataX的Job根据分库分表切分成100个Task。

  • 根据20个并发,DataX计算共需要分配4个TaskGroup

  • 4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责以5个并发共计运行25个Task.

6、当前现状

DataX在阿里巴巴集团内被广泛使用,承担了所有大数据的离线同步业务,并已持续稳定运行了6年之久。目前每天完成同步8w多道作业,每日传输数据量超过300TB。

7、DataX与Sqoop的对比

  Sqoop(发音:skup)是一款开源的工具,主要用于在Hadoop(Hive)与传统的数据库(mysql、postgresql...)间进行数据的传递,可以将一个[关系型数据库](https://baike.baidu.com/item/关系型数据库/8999831)(例如 : MySQL ,Oracle ,Postgres等)中的数据导进到Hadoop的HDFS中,也可以将HDFS的数据导进到关系型数据库中。
 

二、DataX

1、官方地址

下载地址:http://datax-opensource.oss-cn-hangzhou.aliyuncs.com

源码地址:https://github.com/alibaba/DataX

2、前置条件

  • Linux(我使用的是阿里云)

阿里云参考我的另外一篇博客:阿里云部署javaWeb项目依赖软件(jdk、tomcat、Mariadb数据库)的安装_程序员小王的博客-CSDN博客

  • JDK(1.8 以上,推荐 1.8)

  • Python(使用的是 Python2.7.5X)

3、安装部署DataX

(1)访问官网下载安装包

(2)使用filezilla上传安装包到服务器(阿里云)datax节点

(3)解压安装包

  • 将安装到/root/datax/路径下

tar -zxvf datax.tar.gz
  • 安装成功后

(4)运行自检脚本测试(需要在bin下启动)

python datax.py /root/datax/datax/job/job.json

三、DataX实战案例之从 stream 流读取数据并打印到控制台

(1)查看配置模板

python datax.py -r streamreader -w streamwriter
[root@iZbp1bq6vb70qo4o5mrxdzZ bin]# python datax.py -r streamreader -w streamwriter

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.


Please refer to the streamreader document:
     https://github.com/alibaba/DataX/blob/master/streamreader/doc/streamreader.md 

Please refer to the streamwriter document:
     https://github.com/alibaba/DataX/blob/master/streamwriter/doc/streamwriter.md 
 
Please save the following configuration as a json file and  use
     python DATAX_HOME/bin/datax.py JSON_FILE_NAME.json 
to run the job.


    "job": 
        "content": [
            
                "reader": 
                    "name": "streamreader", 
                    "parameter": 
                        "column": [], 
                        "sliceRecordCount": ""
                    
                , 
                "writer": 
                    "name": "streamwriter", 
                    "parameter": 
                        "encoding": "", 
                        "print": true
                    
                
            
        ], 
        "setting": 
            "speed": 
                "channel": ""
            
        
    

(2)根据模板编写配置文件

  • record:  出错记录数超过record设置的条数时,任务标记为失败.

  • percentage: 当出错记录数超过percentage百分数时,任务标记为失败.

  • channel表示任务并发数。

  • bytes表示每秒字节数,默认为0(不限速)。

-在job目录下创建文件:
vim stream2stream.json

  • 运行

-在datax执行命令:
[root@iZbp1bq6vb70qo4o5mrxdzZ datax]# python bin/datax.py job/stream2stream.json

(3)观察控制台输出结果

10      hello,你好,世界-DataX,我是王恒杰!
10      hello,你好,世界-DataX,我是王恒杰!
10      hello,你好,世界-DataX,我是王恒杰!
10      hello,你好,世界-DataX,我是王恒杰!
10      hello,你好,世界-DataX,我是王恒杰!
10      hello,你好,世界-DataX,我是王恒杰!
10      hello,你好,世界-DataX,我是王恒杰!
10      hello,你好,世界-DataX,我是王恒杰!
10      hello,你好,世界-DataX,我是王恒杰!
10      hello,你好,世界-DataX,我是王恒杰!
10      hello,你好,世界-DataX,我是王恒杰!
10      hello,你好,世界-DataX,我是王恒杰!
10      hello,你好,世界-DataX,我是王恒杰!
10      hello,你好,世界-DataX,我是王恒杰!
10      hello,你好,世界-DataX,我是王恒杰!
10      hello,你好,世界-DataX,我是王恒杰!

四、最后的话

 因为DataX内容较多,预计将分五部份完成DataX博客的专栏,分别是
  • 【1】异构数据源离线同步工具之DataX的安装部署

  • 【2】异构数据源离线同步工具之DataX的可视化工具DataX-Web

  • 【3】读取 MySQL 中的数据Oracle 数据库

  • 【4】读取 Oracle中的数据Mysql数据库

  • 【5】读取 Oracle中的数据DB2数据库

  • 【6】读取DB2中的数据Mysql数据库

以上是关于关于数据同步工具DataX部署的主要内容,如果未能解决你的问题,请参考以下文章

数据同步工具DataX和DataWeb知识手册,DataX优化

DATAX如何同步HBASE数据

[大数据技术]datax的安装以及使用

DataX及DataX-Web

数据同步工具DataX

datax传递多个参数到json