去O数据同步助手-DataX

Posted 浪子尘晨

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了去O数据同步助手-DataX相关的知识,希望对你有一定的参考价值。

简介

DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 mysql、SQL Server、Oracle、PostgreSQL、HDFS、Hive、HBase、OTS、ODPS 等各种异构数据源之间高效的数据同步功能。

  • Reader:将不同数据源的同步抽象为从源头数据源读取数据的Reader插件
  • Writer:向目标端写入数据的Writer插件

理论上DataX框架可以支持任意数据源类型的数据同步工作。同时DataX插件体系作为一套生态系统, 每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入,目前支持数据如下图:

类型

数据源

Reader(读)

Writer(写)

文档

RDBMS 关系型数据库

MySQL

Oracle

OceanBase

SQLServer

PostgreSQL

DRDS

通用RDBMS(支持所有关系型数据库)

阿里云数仓数据存储

ODPS

ADS

OSS

OCS

NoSQL数据存储

OTS

Hbase0.94

Hbase1.1

Phoenix4.x

Phoenix5.x

MongoDB

Hive

Cassandra

无结构化数据存储

TxtFile

FTP

HDFS

Elasticsearch

时间序列数据库

OpenTSDB

TSDB

安装

Datax是开箱即用的,下载地址:http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz,下载后解压至本地某个目录,进入bin目录,即可运行同步作业:

$ cd  YOUR_DATAX_HOME/bin
$ python datax.py YOUR_JOB.json

注:datax是通过python脚本启动的,需要先安装python

使用

第一步,创建作业的配置文件(json格式)

以oracle同步mysql为例,t_ad_config.json如下所示:


    "job": 
        "setting": 
            "speed": 
                "channel": 2
            ,
            "errorLimit": 
                "record": 0
            
        ,
        "content": [
            
                "reader": 
                    "name": "oraclereader",
                    "parameter": 
                        "column": [
                            "ID",
                            "FLIGHT_NO",
                            "DEPARTURE_AIRPORT",
                            "ARRIVAL_AIRPORT",
                            "MEMO",
                            "AD_FLAG",
                            "CREATOR",
                            "CREATE_TM",
                            "MODIFY_TM",
                            "MODIFIER",
                            "DELETE_FLAG"
                        ],
                        "connection": [
                            
                                "jdbcUrl": [
                                    "jdbc:oracle:thin:@$r_ip:$r_port:$r_dbname"
                                ],
                                "table": [
                                    "t_ad_config"
                                ]
                            
                        ],
                        "username": "$r_username",
                        "password": "$r_password",
                        "splitPk": "ID"
                    
                ,
                "writer": 
                    "name": "mysqlwriter",
                    "parameter": 
                        "writeMode": "insert",
                        "column": [
                            "ID",
                            "FLIGHT_NO",
                            "DEPARTURE_AIRPORT",
                            "ARRIVAL_AIRPORT",
                            "MEMO",
                            "AD_FLAG",
                            "CREATOR",
                            "CREATE_TM",
                            "MODIFY_TM",
                            "MODIFIER",
                            "DELETE_FLAG"
                        ],
                        "connection": [
                            
                                "jdbcUrl": "jdbc:mysql://$w_ip:$w_port/$w_dbname?useUnicode=true&characterEncoding=utf8&useSSL=false&connectionTimeZone=GMT%2B8&forceConnectionTimeZoneToSession=true",
                                "table": [
                                    "t_ad_config"
                                ]
                            
                        ],
                        "preSql": [
                            "truncate table @table"
                        ],
                        "username": "$w_username",
                        "password": "$w_password"
                    
                
            
        ]
    

参数说明

**speed**

描述:提供了包括通道(并发)、记录流、字节流三种流控模式,可以随意控制你的作业速度,让你的作业在库可以承受的范围内达到最佳的同步速度。

"speed": "channel": 5, "byte": 1048576, "record": 10000

**errorLimit**

描述:出错限制。如:"errorLimit": "record": 0,"percentage": 0.0

record:出错的record条数上限,当大于该值即报错。

percentage:出错的record百分比上限 1.0表示100%,0.02表示2%

**column**

描述:所配置的表中需要同步的列名集合,使用JSON的数组描述字段信息。用户使用*代表默认使用所有列配置,例如["*"],不推荐使用"*"。

**splitPk**

描述:MysqlReader进行数据抽取时,如果指定splitPk,表示用户希望使用splitPk代表的字段进行数据分片,DataX因此会启动并发任务进行数据同步,这样可以大大提供数据同步的效能。

推荐splitPk用户使用表主键,因为表主键通常情况下比较均匀,因此切分出来的分片也不容易出现数据热点。

目前splitPk仅支持整形数据切分,`不支持浮点、字符串、日期等其他类型`。如果用户指定其他非支持类型,MysqlReader将报错!

如果splitPk不填写,包括不提供splitPk或者splitPk值为空,DataX视作使用单通道同步该表数据。

**where**

描述:筛选条件,MysqlReader根据指定的column、table、where条件拼接SQL,并根据这个SQL进行数据抽取。在实际业务场景中,往往会选择当天的数据进行同步,可以将where条件指定为create_tm > sysdate - 1 。注意:不可以将where条件指定为limit 10,limit不是SQL的合法where子句。

where条件可以有效地进行业务增量同步。如果不填写where语句,包括不提供where的key或者value,DataX均视作同步全量数据。

**querySql**

描述:在有些业务场景下,where这一配置项不足以描述所筛选的条件,用户可以通过该配置型来自定义筛选SQL。当用户配置了这一项之后,DataX系统就会忽略table,column,where这些配置型,直接使用这个配置项的内容对数据进行筛选,例如需要进行多表join后同步数据,使用select a,b from table_a join table_b on table_a.id = table_b.id

**preSql**

描述:写入数据到目的表前,会先执行这里的标准语句。如果 Sql 中有你需要操作到的表名称,请使用 `@table` 表示,这样在实际执行 Sql 语句时,会对变量按照实际表名称进行替换。比如你的任务是要写入到目的端的100个同构分表(表名称为:datax_00,datax01, ... datax_98,datax_99),并且你希望导入数据前,先对表中数据进行删除操作,那么你可以这样配置:`"preSql":["delete from 表名"]`,效果是:在执行到每个表写入数据前,会先执行对应的 delete from 对应表名称。

**postSql**

描述:写入数据到目的表后,会执行这里的标准语句。(原理同 preSql )

**writeMode**

描述:控制写入数据到目标表采用 `insert into` 或者 `replace into` 或者 `ON DUPLICATE KEY UPDATE` 语句

所有选项:insert/replace/update

**batchSize**

描述:一次性批量提交的记录数大小,该值可以极大减少DataX与Mysql的网络交互次数,并提升整体吞吐量。但是该值设置过大可能会造成DataX运行进程OOM情况。

第二步,启动DataX

$ cd F:/datax/datax/bin
$ python datax.py ../job/telegram/t_ad_config.json

作业生成

下面是用于生成datax任务json文件的工具,template.json和DataxJson.py


  "job": 
    "setting": 
      "speed": 
        "channel": 2
      ,
      "errorLimit": 
        "record": 0,
        "percentage": 0.00
      
    ,
    "content": [
      
        "reader": 
          "name": "oraclereader",
          "parameter": 
            "column" : ["*"],
            "connection": [
              
                "jdbcUrl": ["jdbc:oracle:thin:@$r_ip:$r_port:$r_dbname"],
                "table": ["T_AD_CONFIG"]
              
            ],
            "username": "$r_username",
            "password": "$r_password",
            "splitPk": "AIRPORT_4CODE"
          
        ,
        "writer": 
          "name": "mysqlwriter",
          "parameter": 
            "writeMode": "insert",
            "column": ["*"],
            "connection": [
              
                "jdbcUrl": "jdbc:mysql://$w_ip:$w_port/$w_dbname?useUnicode=true&characterEncoding=utf8&useSSL=false&connectionTimeZone=GMT%2B8&forceConnectionTimeZoneToSession=true",
                "table": ["T_AD_CONFIG"]
              
            ],
            "preSql": [
              "truncate table @table"
            ],
            "username": "$w_username",
            "password": "$w_password"
          
        
      
    ]
  
#!/usr/bin/env python3

import pymysql
import json

# key: tableName, value: splitPk
tableNames=
    "t_fp_area_config": "",
    "t_fp_attachment": "",
    "t_fp_assess_flight": "",
    "t_fp_lease_flight": "",
    "t_fp_navigation": "",
    "t_fp_navigation_library": "",
    "t_fp_requirement": "",
    "t_fp_support_standard": "",
    "t_fp_transport_capacity": "",
    "t_fp_email_config": "",

db_host="10.88.20.10"
db_port=3307
db_tableSchema="planning"
db_username="planning"
db_password="planning"

# 获取表的信息
def getTableInfo(connection, tableSchema, tableName):
    cursor = connection.cursor()
    sql = "select column_name, column_key from information_schema.columns where table_name=%s and table_schema=%s"
    cursor.execute(sql, (tableName, tableSchema))
    return cursor.fetchall()

# 生成每个表的同步JSON文件
def generatorJson(tableInfo, splitPk):
    columnNames=[]
    columnKeys=[]
    for cloumnName in tableInfo:
        columnNames.append(cloumnName[0])
        if cloumnName[1] == "PRI":
            columnKeys.append(cloumnName[0])
    if len(columnKeys) == 1 and len(splitPk) == 0:
        splitPk = columnKeys[0]

    jsonObj=
    with open("template.json", 'r') as f:
        jsonObj = json.load(f)

    content = jsonObj["job"]["content"][0]
    content["reader"]["parameter"]["column"] = columnNames
    content["reader"]["parameter"]["splitPk"] = splitPk
    content["reader"]["parameter"]["connection"][0]["table"] = [tableName]
    content["writer"]["parameter"]["column"] = columnNames
    content["writer"]["parameter"]["connection"][0]["table"] = [tableName]
    # 生成全量同步JSON配置
    with open(tableName + ".json", "w") as f:
        json.dump(jsonObj, f, indent=4)
        print(tableName + ".json文件已生成")

    content["reader"]["parameter"]["where"] = "MODIFY_TM >= sysdate - $incr_time"
    content["writer"]["parameter"]["preSql"] = ["delete from @table where MODIFY_TM >= NOW() - INTERVAL \\'$incr_time\\' DAY"]
    # 生成增量同步JSON配置
    with open(tableName + "-incr.json", "w") as f:
        json.dump(jsonObj, f, indent=4)
        print(tableName + "-incr.json文件已生成")

# 生成datax执行shell脚本
def generatorShell():
    fullFile = open("full_sync_task.sh", "w", encoding='utf-8')
    fullFile.write(
        """#!/bin/bash

CHCP 65001

datax_home=\\"F:/datax/datax\\"

# 读库的IP
r_ip=\\"10.88.27.103\\"
# 读库的端口
r_port=\\"1521\\"
# 读库的数据库名称
r_dbname=\\"focnew\\"
# 读库的账号
r_username=\\"focnew_uat\\"
# 读库的密码
r_password=\\"focnewuat#1\\"

# 写库的IP
w_ip=\\"10.88.27.166\\"
# 写库的端口
w_port=\\"3307\\"
# 写库的数据库名称
w_dbname=\\"planning\\"
# 写库的账号
w_username=\\"planning\\"
# 写库的密码
w_password=\\"planning\\"
""")
    for tableName in tableNames:
        fullFile.write("\\npython $datax_home/bin/datax.py $datax_home/job/xxx/" + tableName + ".json -p \\"-Dr_ip=$r_ip -Dr_port=$r_port -Dr_dbname=$r_dbname -Dr_username=$r_username -Dr_password=$r_password -Dw_ip=$w_ip -Dw_port=$w_port -Dw_dbname=$w_dbname -Dw_username=$w_username -Dw_password=$w_password\\"")
    fullFile.close()

connection = pymysql.connect(host=db_host, port=db_port, user=db_username, passwd=db_password, database=db_tableSchema)

for tableName in tableNames:
    print("开始生成Table:" + tableName + "的datax同步文件")
    tableInfo = getTableInfo(connection, db_tableSchema, tableName)
    generatorJson(tableInfo, tableNames[tableName])

# 生成同步shell脚本
generatorShell()

以上是关于去O数据同步助手-DataX的主要内容,如果未能解决你的问题,请参考以下文章

DataX大数据量同步优化方案

数据同步中间件DataX

大数据技术之DataX

异构数据源离线同步工具之DataX的安装部署

使用 DataX 实现数据同步(高效的数据同步工具)

数据同步工具DataX