DataX的安装及使用

Posted 星辰安安

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了DataX的安装及使用相关的知识,希望对你有一定的参考价值。

DataX的安装及使用

Hive通过外部表与HBase表关联

hive建表语句:
// 第一个字段通常命名为key
CREATE EXTERNAL TABLE hivehbasetable(
    key INT
    ,name STRING
    ,age INT
    ,gender STRING
    ,clazz  STRING
    ,last_mod STRING
) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' 
WITH SERDEPROPERTIES (
    "hbase.columns.mapping" = ":key,cf1:name,cf1:age,cf1:gender,cf1:clazz,cf1:last_mod") 
    TBLPROPERTIES("hbase.table.name" = "student");
hbase表
create 'student','cf1'
# 自己导入数据
直接执行查询语句:
select key,name from hivehbasetable limit 10;

hbase外部表 不能使用sqoop直接导入数据,必须通过例如:insert into这样的形式导入
因为sqoop导入数据,使用的原理是load data,load data只能在表的存储格式为textfile时,才能真正将数据加载到表中

DataX的安装

DataX不需要依赖其他服务,直接上传、解压、安装、配置环境变量即可
也可以直接在windows上解压

DataX的使用

stream2stream

编写配置文件stream2stream.json
# stream2stream.json

  "job": 
    "content": [
      
        "reader": 
          "name": "streamreader",
          "parameter": 
            "sliceRecordCount": 10,
            "column": [
              
                "type": "long",
                "value": "10"
              ,
              
                "type": "string",
                "value": "hello,你好,世界-DataX"
              
            ]
          
        ,
        "writer": 
          "name": "streamwriter",
          "parameter": 
            "encoding": "UTF-8",
            "print": true
          
        
      
    ],
    "setting": 
      "speed": 
        "channel": 5
       
    
  

执行同步任务
datax.py stream2stream.json
执行结果

mysql2mysql

需要新建student2数据库,并创建student表

编写配置文件mysql2mysql.json

    "job": 
        "content": [
            
                "reader": 
                    "name": "mysqlreader",
                    "parameter": 
                        "username": "root",
                        "password": "123456",
                        "column": [
                            "id",
                            "name",
                            "age",
                            "gender",
                            "clazz",
                            "last_mod"
                        ],
                        "splitPk": "age",
                        "connection": [
                            
                                "table": [
                                    "student"
                                ],
                                "jdbcUrl": [
                                    "jdbc:mysql://master:3306/student"
                                ]
                            
                        ]
                    
                ,
                "writer": 
                    "name": "mysqlwriter",
                    "parameter": 
                        "writeMode": "insert",
                        "username": "root",
                        "password": "123456",
                        "column": [
                            "id",
                            "name",
                            "age",
                            "gender",
                            "clazz",
                            "last_mod"
                        ],
                        "preSql": [
                            "truncate student2"
                        ],                        
                        "connection": [
                            
                                "jdbcUrl": "jdbc:mysql://master:3306/student2?useUnicode=true&characterEncoding=utf8",
                                "table": [
                                    "student2"
                                ]
                            
                        ]
                    
                
            
        ],
        "setting": 
            "speed": 
                "channel": 6
            
        
    

执行同步任务
datax.py mysql2mysql.json

mysql2hdfs

写hive跟hdfs时一样的

编写配置文件mysql2hdfs.json

    "job": 
        "content": [
            
                "reader": 
                    "name": "mysqlreader",
                    "parameter": 
                        "username": "root",
                        "password": "123456",
                        "column": [
                            "id",
                            "name",
                            "age",
                            "gender",
                            "clazz",
                            "last_mod"
                        ],
                        "splitPk": "age",
                        "connection": [
                            
                                "table": [
                                    "student"
                                ],
                                "jdbcUrl": [
                                    "jdbc:mysql://master:3306/student"
                                ]
                            
                        ]
                    
                ,
                "writer": 
                    "name": "hdfswriter",
                    "parameter": 
                        "defaultFS": "hdfs://master:9000",
                        "fileType": "text",
                        "path": "/user/hive/warehouse/datax.db/students",
                        "fileName": "student",
                        "column": [
                            
                                "name": "id",
                                "type": "bigint"
                            ,
                            
                                "name": "name",
                                "type": "string"
                            ,
                            
                                "name": "age",
                                "type": "INT"
                            ,
                            
                                "name": "gender",
                                "type": "string"
                            ,
                            
                                "name": "clazz",
                                "type": "string"
                            ,
                            
                                "name": "last_mod",
                                "type": "string"
                            
                        ],
                        "writeMode": "append",
                        "fieldDelimiter": ","
                    
                
            
        ],
        "setting": 
            "speed": 
                "channel": 6
            
        
    

hbase2mysql


    "job": 
        "content": [
            
                "reader": 
                    "name": "hbase11xreader",
                    "parameter": 
                        "hbaseConfig": 
                            "hbase.zookeeper.quorum": "master:2181"
                        ,
                        "table": "student",
                        "encoding": "utf-8",
                        "mode": "normal",
                        "column": [
                            
                                "name": "rowkey",
                                "type": "string"
                            ,
                            
                                "name": "cf1:name",
                                "type": "string"
                            ,
                            
                                "name": "cf1:age",
                                "type": "string"
                            ,
                            
                                "name": "cf1:gender",
                                "type": "string"
                            ,
                            
                                "name": "cf1:clazz",
                                "type": "string"
                            
                        ],
                        "range": 
                            "startRowkey": "",
                            "endRowkey": "",
                            "isBinaryRowkey": false
                        
                    
                ,
                "writer": 
                    "name": "mysqlwriter",
                    "parameter": 
                        "writeMode": "insert",
                        "username": "root",
                        "password": "123456",
                        "column": [
                            "id",
                            "name",
                            "age",
                            "gender",
                            "clazz"
                        ],
                        "preSql": [
                            "truncate student2"
                        ],                        
                        "connection": [
                            
                                "jdbcUrl": "jdbc:mysql://master:3306/student2?useUnicode=true&characterEncoding=utf8",
                                "table": [
                                    "student2"
                                ]
                            
                        ]
                    
                
            
        ],
        "setting": 
            "speed": 
                "channel": 6
            
        
    

mysql2hbase

mysql中的score表需将cource_id改为course_id,并将student_id、course_id设为主键,并将所有字段的类型改为int
hbase需先创建score表:create ‘score’,‘cf1’


    "job": 
        "content": [
            
                "reader": 
                    "name": "mysqlreader",
                    "parameter": 
                        "username": "root",
                        "password": "123456",
                        "column": [
                            "student_id",
                            "course_id",
                            "score"
                        ],
                        "splitPk": "course_id",
                        "connection": [
                            
                                "table": [
                                    "score"
                                ],
                                "jdbcUrl": [
                                    "jdbc:mysql://master:3306/student"
                                ]
                            
                        ]
                    
                ,
                "writer": 
                    "name": "hbase11xwriter",
                    "parameter": 
                      "hbaseConfig": 
                        "hbase.zookeeper.quorum": "master:2181"
                      ,
                      "table": "score",
                      "mode": "normal",
                      "rowkeyColumn": [
                          
                            "index":0,
                            "type":"string"
                          ,
                          
                            "index":-1,
                            "type":"string",
                            "value":"_"
                          ,
                          
                            "index":1,
                            "type":"string"
                          
                      ],
                      "column": [
                        
                          "index":2,
                          "name": "cf1:score",
                          "type": "int"
                        
                      ],
                      "encoding": "utf-8"
                    
                  
            
        ],
        "setting": 
            "speed": 
                "channel": 6
            
        
    

DataX-web的安装

1、使用idea同步源代码
2、在mysql5.1中新建dataxweb数据库
3、执行数据库初始化脚本
4、修改admin配置文件
  • bootstrap.properties

#Database
DB_HOST=rm-bp1h7v927zia3t8iwho.mysql.rds.aliyuncs.com
DB_PORT=3306
DB_USERNAME=datax
DB_PASSWORD=123456
DB_DATABASE=datax_web
  • application.yml
server:
  #port: 8080
  port: 8080
spring:
  #数据源
  datasource:
    #    username: root
    #password: root
    #url: jdbc:mysql://localhost:3306/datax_web?serverTimezone=Asia/Shanghai&useLegacyDatetimeCode=false&useSSL=false&nullNamePatternMatchesAll=true&useUnicode=true&characterEncoding=UTF-8
    password: $DB_PASSWORD:password
    username: $DB_USERNAME:username
    url: jdbc:mysql://$DB_HOST:127.0.0.1:$DB_PORT:3306/$DB_DATABASE:dataxweb?serverTimezone=Asia/Shanghai&useLegacyDatetimeCode=false&useSSL=false&nullNamePatternMatchesAll=true&useUnicode=true&characterEncoding=UTF-8
    driver-class-name: com.mysql.jdbc.Driver


    hikari:
      ## 最小空闲连接数量
      minimum-idle: 5
      ## 空闲连接存活最大时间,默认600000(10分钟)
      idle-timeout: 180000
      ## 连接池最大连接数,默认是10
      maximum-pool-size: 10
      ## 数据库连接超时时间,默认30秒,即30000
      connection-timeout: 30000
      connection-test-query: SELECT 1
      ##此属性控制池中连接的最长生命周期,值0表示无限生命周期,默认1800000即30分钟
      max-lifetime: 1800000

  # datax-web email
  mail:
    host: smtp.qq.com
    port: 25
    #username: xxx@qq.com
    #password: xxx
    username: test1
    password: test1
    properties:
      mail:
        smtp:
          auth: true
          starttls:
            enable: true
            required: true
        socketFactory:
          class: javax.net.ssl.SSLSocketFactory


management:
  health:
    mail:
      enabled: false
  server:
    servlet:
      context-path: /actuator

mybatis-plus:
  # mapper.xml文件扫描
  mapper-locations: classpath*:/mybatis-mapper/*Mapper.xml
  # 实体扫描,多个package用逗号或者分号分隔
  #typeAliasesPackage: com.yibo.essyncclient.*.entity
  global-config:
    # 数据库相关配置
    db-config:
      # 主键类型  AUTO:"数据库ID自增", INPUT:"用户输入ID", ID_WORKER:"全局唯一ID (数字类型唯一ID)", UUID:"全局唯一ID UUID";
      id-type: AUTO
      # 字段策略 IGNORED:"忽略判断",NOT_NULL:"非 NULL 判断"),NOT_EMPTY:"非空判断"
      field-strategy: NOT_NULL
      # 驼峰下划线转换
      column-underline: true
      # 逻辑删除
      logic-delete-value: 0
      logic-not-delete-value: 1
      # 数据库类型
      db-type: mysql
    banner: false
  # mybatis原生配置
  configuration:
    map-underscore-to-camel-case: true
    cache-enabled: false
    call-setters-on-nulls: true
    jdbc-type-for-null: 'null'
    type-handlers-package: com.wugui.datax.admin.core.handler

# 配置mybatis-plus打印sql日志
logging:
  #level:
  #  com.wugui.datax.admin.mapper: info
#path: ./data/applogs/admin
  level:
    com.wugui.datax.admin.mapper: error
    path: $data.path/applogs/admin



#datax-job, access token
datax:
  job:
    accessToken:
    #i18n (default empty as chinese, "en" as english)
    i18n:
    ## triggerpool max size
    triggerpool:
      fast:
        max: 200
      slow:
        max: 100
      ### log retention days
    logretentiondays: 30

datasource:
  aes:
    key: AD42F6697B035B75
5、启动DataX-Web-Admin

  • 访问: http://127.0.0.1:8080/index.html,用户名:admin,密码:123456

6、修改datax-web-executor配置文件
  • application.yml

# web port
server:
  port: 8081
  #port: 8081

# log config
logging:
  config: classpath:logback.xml
  path: ./applogs/executor/jobhandler
  #path: ./data/applogs/executor/jobhandler

datax:
  job:
    admin:
      ### datax admin address list, such as "http://address" or "http://address01,http://address02"
      #addresses: http://127.0.0.1:8080
      addresses: http://127.0.0.1:8080
    executor:
      appname: datax-executor
      ip:
      #port: 9999
      port: 9999
      ### job log path
      #logpath: ./data/applogs/executor/jobhandler
      logpath: ./applogs/executor/jobhandler
      ### job log retention days
      logretentiondays: 30
    ### job, access token
    accessToken:

  executor:
    #jsonpath: D:\\\\temp\\\\executor\\\\json\\\\
    jsonpath: ./json

  #pypath: F:\\tools\\datax\\bin\\datax.py
  pypath: C:\\Users\\zzk10\\Documents\\MacOS\\DataIntegrate\\datax\\datax\\bin\\datax.py

注意pypath:这个路径需要解压datax.tar.gz到自己的路径,不要使用中文路径

解压后使用datax-web中的python3脚本替换掉 datax/bin/

7、启动datax-web-executor

只要进程没有自己停止,一直在后台运行即可

DataX-Web的使用

需要先安装Python3环境

新建数据源

  • mysql

  • hive

    需要先启动hiveserver2服务
    下面这条命令需要在linux shell中执行
    hive --service hiveserver2

新建项目test1

新建任务模板

构建任务

  • 创建reader

  • 创建writer

  • 字段映射

  • 构建任务

  • 执行任务

  • 查看日志

添加hbase 数据源会报错,需要修改两个pom.xml文件

改完pom文件,记得重新reimport,才会生效

  • 重新测试hbase连通性

以上是关于DataX的安装及使用的主要内容,如果未能解决你的问题,请参考以下文章

DataX的安装及使用

DataX的安装及使用

DataX的安装及使用

DataX windows安装及基本使用

DataX安装及基本使用

datax详细介绍及使用