DataX 快速入门

Posted @SmartSi

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了DataX 快速入门相关的知识,希望对你有一定的参考价值。

DataX 版本:3.0

Github主页地址:https://github.com/alibaba/DataX

DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(mysql、Oracle等)、HDFS、Hive、ODPS、HBase、FTP 等各种异构数据源之间稳定高效的数据同步功能。具体请查阅:DataX 异构数据源离线同步

1. 环境要求

  • Linux
  • JDK(1.8 以上,推荐 1.8)
  • Python(推荐 Python2.6.X)

2. 下载

直接下载 DataX 工具包:下载地址。下载后解压至本地 /opt 目录下:

tar -zxvf datax.tar.gz -C /opt/

除了直接下载安装包之外,你也可以下载 DataX 源码,自己编译

进入 bin 目录,即可运行同步作业。可以运行如下自查脚本检查安装是否成功:

python YOUR_DATAX_HOME/bin/datax.py YOUR_DATAX_HOME/job/job.json

在我们这需要运行如下语句:

python /opt/datax/bin/datax.py /opt/datax/job/job.json

看到如下运行信息表示我们已经运行成功了:

3. 示例

我们第一个简单示例是从 Stream 读取数据并打印到控制台。

3.1 查看配置模板

配置文件为 json 格式

DataX 为不同的 Reader 和 Writer 分别提供了不同的配置模块,可以通过如下命令指定 Reader 和 Writer 查看配置模板:

python YOUR_DATAX_HOME/bin/datax.py -r YOUR_READER -w YOUR_WRITER

在这我们需要运行如下语句:

python /opt/datax/bin/datax.py -r streamreader -w streamwriter

输出信息中包含了如下配置模板 JSON:


    "job": 
        "content": [
            
                "reader": 
                    "name": "streamreader",
                    "parameter": 
                        "column": [],
                        "sliceRecordCount": ""
                    
                ,
                "writer": 
                    "name": "streamwriter",
                    "parameter": 
                        "encoding": "",
                        "print": true
                    
                
            
        ],
        "setting": 
            "speed": 
                "channel": ""
            
        
    

从配置模板中可以看到配置文件需要配置三部分

  • 配置同步任务的读取端 reader:配置同步任务的读取端数据信息
  • 配置同步任务的写入端 writer:配置同步任务的写入端数据信息
  • 配置通道控制 setting:配置同步任务全局信息(不包含读取端、写入端外配置信息)。你可以在 setting 中进行同步速率配置,新版本DataX 3.0 提供了包括通道(并发)、记录流、字节流三种流控模式,可以随意控制你的作业速度。此外还提供了脏数据探测能力,可以实现脏数据精确过滤、识别、采集、展示,为用户提供多种的脏数据处理模式。

3.2 根据模板编写配置文件

(1) 配置同步任务的读取端

通过配置模板已生成了基本的读取端配置。此时你可以继续手动配置同步任务的读取端数据信息,如下所示输出5条记录,第一个字段数据类型为 Long 的 10,第二个字段数据类型为 String 的 hello,DataX

"reader": 
    "name": "streamreader",
    "parameter": 
        "column": [
          
              "type": "long",
              "value": "10"
          ,
          
              "type": "string",
              "value": "hello,DataX"
          
        ],
        "sliceRecordCount": "5"
    

(2) 配置同步任务的写入端

配置完成读取端数据信息后,可以继续手动配置同步任务的写入端数据信息:

"writer": 
    "name": "streamwriter",
    "parameter": 
        "encoding": "UTF-8",
        "print": true
    

(3) 配置通道控制

当上述步骤配置完成后,则需要配置同步速率:

"setting": 
    "speed": 
        "channel": "1"
    

创建配置文件 stream2stream.json 并放入 /opt/datax/job/ 目录下:


    "job": 
        "content": [
            
                "reader": 
                    "name": "streamreader",
                    "parameter": 
                        "column": [
                          
                              "type": "long",
                              "value": "10"
                          ,
                          
                              "type": "string",
                              "value": "hello,DataX"
                          
                        ],
                        "sliceRecordCount": "5"
                    
                ,
                "writer": 
                    "name": "streamwriter",
                    "parameter": 
                        "encoding": "UTF-8",
                        "print": true
                    
                
            
        ],
        "setting": 
            "speed": 
                "channel": "1"
            
        
    

3.3 运行

直接运行如下命令:

/opt/datax/bin/datax.py /opt/datax/job/stream2stream.json

输出如下信息:

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.

2022-04-30 23:19:42.460 [main] INFO  VMInfo - VMInfo# operatingSystem class => sun.management.OperatingSystemImpl
2022-04-30 23:19:42.469 [main] INFO  Engine - the machine info  =>

...

2022-04-30 23:19:42.492 [main] INFO  Engine -

	"content":[
		
			"reader":
				"name":"streamreader",
				"parameter":
					"column":[
						
							"type":"long",
							"value":"10"
						,
						
							"type":"string",
							"value":"hello,DataX"
						
					],
					"sliceRecordCount":"5"
				
			,
			"writer":
				"name":"streamwriter",
				"parameter":
					"encoding":"UTF-8",
					"print":true
				
			
		
	],
	"setting":
		"speed":
			"channel":"1"
		
	


2022-04-30 23:19:42.511 [main] WARN  Engine - prioriy set to 0, because NumberFormatException, the value is: null
2022-04-30 23:19:42.513 [main] INFO  PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=0
2022-04-30 23:19:42.513 [main] INFO  JobContainer - DataX jobContainer starts job.
2022-04-30 23:19:42.515 [main] INFO  JobContainer - Set jobId = 0
2022-04-30 23:19:42.530 [job-0] INFO  JobContainer - jobContainer starts to do prepare ...
2022-04-30 23:19:42.531 [job-0] INFO  JobContainer - DataX Reader.Job [streamreader] do prepare work .
2022-04-30 23:19:42.532 [job-0] INFO  JobContainer - DataX Writer.Job [streamwriter] do prepare work .
2022-04-30 23:19:42.532 [job-0] INFO  JobContainer - jobContainer starts to do split ...
2022-04-30 23:19:42.532 [job-0] INFO  JobContainer - Job set Channel-Number to 1 channels.
2022-04-30 23:19:42.532 [job-0] INFO  JobContainer - DataX Reader.Job [streamreader] splits to [1] tasks.
2022-04-30 23:19:42.533 [job-0] INFO  JobContainer - DataX Writer.Job [streamwriter] splits to [1] tasks.
2022-04-30 23:19:42.549 [job-0] INFO  JobContainer - jobContainer starts to do schedule ...
2022-04-30 23:19:42.554 [job-0] INFO  JobContainer - Scheduler starts [1] taskGroups.
2022-04-30 23:19:42.556 [job-0] INFO  JobContainer - Running by standalone Mode.
2022-04-30 23:19:42.563 [taskGroup-0] INFO  TaskGroupContainer - taskGroupId=[0] start [1] channels for [1] tasks.
2022-04-30 23:19:42.567 [taskGroup-0] INFO  Channel - Channel set byte_speed_limit to -1, No bps activated.
2022-04-30 23:19:42.567 [taskGroup-0] INFO  Channel - Channel set record_speed_limit to -1, No tps activated.
2022-04-30 23:19:42.580 [taskGroup-0] INFO  TaskGroupContainer - taskGroup[0] taskId[0] attemptCount[1] is started
10	hello,DataX
10	hello,DataX
10	hello,DataX
10	hello,DataX
10	hello,DataX
2022-04-30 23:19:42.685 [taskGroup-0] INFO  TaskGroupContainer - taskGroup[0] taskId[0] is successed, used[107]ms
2022-04-30 23:19:42.686 [taskGroup-0] INFO  TaskGroupContainer - taskGroup[0] completed it's tasks.
2022-04-30 23:19:52.573 [job-0] INFO  StandAloneJobContainerCommunicator - Total 5 records, 65 bytes | Speed 6B/s, 0 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.000s | Percentage 100.00%
2022-04-30 23:19:52.573 [job-0] INFO  AbstractScheduler - Scheduler accomplished all tasks.
2022-04-30 23:19:52.574 [job-0] INFO  JobContainer - DataX Writer.Job [streamwriter] do post work.
2022-04-30 23:19:52.574 [job-0] INFO  JobContainer - DataX Reader.Job [streamreader] do post work.
2022-04-30 23:19:52.575 [job-0] INFO  JobContainer - DataX jobId [0] completed successfully.
2022-04-30 23:19:52.576 [job-0] INFO  HookInvoker - No hook invoked, because base dir not exists or is a file: /opt/datax/hook
2022-04-30 23:19:52.579 [job-0] INFO  JobContainer -
	 [total cpu info] =>
		averageCpu                     | maxDeltaCpu                    | minDeltaCpu
		-1.00%                         | -1.00%                         | -1.00%


	 [total gc info] =>
		 NAME                 | totalGCCount       | maxDeltaGCCount    | minDeltaGCCount    | totalGCTime        | maxDeltaGCTime     | minDeltaGCTime
		 PS MarkSweep         | 0                  | 0                  | 0                  | 0.000s             | 0.000s             | 0.000s
		 PS Scavenge          | 0                  | 0                  | 0                  | 0.000s             | 0.000s             | 0.000s

2022-04-30 23:19:52.580 [job-0] INFO  JobContainer - PerfTrace not enable!
2022-04-30 23:19:52.580 [job-0] INFO  StandAloneJobContainerCommunicator - Total 5 records, 65 bytes | Speed 6B/s, 0 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.000s | Percentage 100.00%
2022-04-30 23:19:52.581 [job-0] INFO  JobContainer -
任务启动时刻                    : 2022-04-30 23:19:42
任务结束时刻                    : 2022-04-30 23:19:52
任务总计耗时                    :                 10s
任务平均流量                    :                6B/s
记录写入速度                    :              0rec/s
读出记录总数                    :                   5
读写失败总数                    :                   0

以上是关于DataX 快速入门的主要内容,如果未能解决你的问题,请参考以下文章

DataX入门

DataX使用入门

Datax-web入门配置与启动

大数据DataX:DataX安装及使用

离线数仓之业务日志采集工具DataX

DataX-操作HDFS