MySQL数据同步到HIVE,自动生成MySQL表对应HIVE的建表语句

Posted 小基基o_O

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MySQL数据同步到HIVE,自动生成MySQL表对应HIVE的建表语句相关的知识,希望对你有一定的参考价值。

文章目录

背景

  • mysql数据同步到HIVE时,要写HIVE的建表语句
    数据类型全写STRING不够好,对此写个自动转换程序
    转换后不一定是最终的,仍可能要微调
  • 开发语言:Python3

MySQL列数据类型转成HIVE的

from re import fullmatch

def column_type_mysql2hive(mysql_column_type):
    """MySQL列数据类型转成HIVE的"""
    # tinyint
    if fullmatch('^tinyint.+unsigned', mysql_column_type):
        return 'SMALLINT'
    elif fullmatch('^tinyint.*', mysql_column_type):
        return 'TINYINT'
    # smallint
    elif fullmatch('^smallint.+unsigned', mysql_column_type):
        return 'INT'
    elif fullmatch('^smallint.*', mysql_column_type):
        return 'SMALLINT'
    # mediumint
    elif fullmatch('^mediumint.*', mysql_column_type):
        return 'INT'
    # int
    elif fullmatch('^int.+unsigned', mysql_column_type):
        return 'BIGINT'
    elif fullmatch('^int.*', mysql_column_type):
        return 'INT'
    # bigint
    elif fullmatch('^bigint.+unsigned', mysql_column_type):
        # return 'STRING'
        return 'BIGINT'  # 无符号BIGINT可能会越界
    elif fullmatch('^bigint.*', mysql_column_type):
        return 'BIGINT'
    # double、float、decimal
    elif fullmatch('^double.*', mysql_column_type):
        return 'DOUBLE'
    elif fullmatch('^float.*', mysql_column_type):
        return 'FLOAT'
    elif fullmatch(r'^decimal.*', mysql_column_type):
        return mysql_column_type.replace(' unsigned', '').upper()
    # 其它
    else:
        return 'STRING'
MySQLHIVE
tinyintTINYINT
tinyint unsignedSMALLINT
smallintSMALLINT
smallint unsignedINT
mediumint 和 mediumint unsignedINT
intINT
int unsignedBIGINT
bigintBIGINT
bigint unsignedBIGINT 或 STRING
decimalDECIMAL
floatFLOAT
doubleDOUBLE
  • 按上述映射关系转,不在上述表格的数据类型统一转STRING
  • 注意:
    1、无符号BIGINT转HIVE的BIGINT可能会越界;如果越界,就考虑转STRINGDECIMAL
    2、建议bit类型数据,用bin函数转成字符串,再导入到HDFS

自动生成MySQL表对应HIVE建表语句完整代码

from re import fullmatch
from pymysql import Connection  # conda install -y pymysql

# 查询MySQL表的列名、列类型和列注释
SQL_COLUMNS = '''
SELECT
  `COLUMN_NAME`     -- 列名
  ,`COLUMN_TYPE`    -- 类型
  ,`COLUMN_COMMENT` -- 列注释
FROM `information_schema`.`COLUMNS`
WHERE `TABLE_SCHEMA`='TABLE_SCHEMA'
  AND `TABLE_NAME`='TABLE_NAME'
ORDER BY `ORDINAL_POSITION`;
'''.strip().format

# 查询MySQL表的注释
SQL_COMMENT = '''
SELECT `TABLE_COMMENT`
FROM `information_schema`.`TABLES`
WHERE `TABLE_SCHEMA`='TABLE_SCHEMA'
  AND `TABLE_NAME`='TABLE_NAME';
'''.strip().format

# HIVE表前缀
HIVE_PREFIX = 'ods_'

# HIVE建表语句
HIVE_DDL = '''
CREATE EXTERNAL TABLE `table`(
columns
) COMMENT 'table_comment'
PARTITIONED BY (`ymd` STRING COMMENT '年月日');
'''.strip().format

# MySQL原表的建表语句,用于参照
MYSQL_DDL = "SHOW CREATE TABLE `TABLE_SCHEMA`.`TABLE_NAME`".format


def column_type_mysql2hive(mysql_column_type):
    """MySQL列数据类型转成HIVE的"""
    # tinyint
    if fullmatch('^tinyint.+unsigned', mysql_column_type):
        return 'SMALLINT'
    elif fullmatch('^tinyint.*', mysql_column_type):
        return 'TINYINT'
    # smallint
    elif fullmatch('^smallint.+unsigned', mysql_column_type):
        return 'INT'
    elif fullmatch('^smallint.*', mysql_column_type):
        return 'SMALLINT'
    # mediumint
    elif fullmatch('^mediumint.*', mysql_column_type):
        return 'INT'
    # int
    elif fullmatch('^int.+unsigned', mysql_column_type):
        return 'BIGINT'
    elif fullmatch('^int.*', mysql_column_type):
        return 'INT'
    # bigint
    elif fullmatch('^bigint.+unsigned', mysql_column_type):
        # return 'STRING'
        return 'BIGINT'  # 无符号BIGINT可能会越界
    elif fullmatch('^bigint.*', mysql_column_type):
        return 'BIGINT'
    # double、float、decimal
    elif fullmatch('^double.*', mysql_column_type):
        return 'DOUBLE'
    elif fullmatch('^float.*', mysql_column_type):
        return 'FLOAT'
    elif fullmatch(r'^decimal.*', mysql_column_type):
        return mysql_column_type.replace(' unsigned', '').upper()
    # 其它
    else:
        return 'STRING'


class Mysql:
    def __init__(self, **kwargs):
        self.db = Connection(
            user=kwargs.pop('user', 'root'),
            password=kwargs.pop('password'),
            host=kwargs.pop('host', 'localhost'),
            database=kwargs.pop('database'),
            port=kwargs.pop('port', 3306),
            charset=kwargs.pop('charset', 'UTF8'),
        )
        self.cursor = self.db.cursor()

    def __del__(self):
        self.cursor.close()
        self.db.close()

    def commit(self, sql):
        try:
            self.cursor.execute(sql)
            self.db.commit()
        except Exception as e:
            print(e)

    def fetchall(self, query):
        self.cursor.execute(query)
        return self.cursor.fetchall()  # 有数据:tuple of tuple;无数据:()

    def get_columns(self, db, tb):
        columns = []
        for c_name, c_type, c_comment in self.fetchall(SQL_COLUMNS(TABLE_SCHEMA=db, TABLE_NAME=tb)):
            hive_type = column_type_mysql2hive(c_type)
            columns.append(f"  `c_name` hive_type COMMENT 'c_comment',")
        return '\\n'.join(columns).rstrip(',')

    def get_table_comment(self, db, tb):
        return self.fetchall(SQL_COMMENT(TABLE_SCHEMA=db, TABLE_NAME=tb))[0][0]

    def get_hive_ddl(self, db, tb, prefix='ods_mysql_', postfix='_full'):
        columns = self.get_columns(db, tb)
        comment = self.get_table_comment(db, tb)
        table = prefix + tb + postfix
        return HIVE_DDL(table=table, columns=columns, table_comment=comment)

    def get_mysql_ddl(self, db, tb):
        return self.fetchall(MYSQL_DDL(TABLE_SCHEMA=db, TABLE_NAME=tb))[0][1]


if __name__ == '__main__':
    TABLE_SCHEMA = 'z'
    TABLE_NAME = 'yyy_inf'
    m = Mysql(password='123456', database='information_schema')
    print('源MySQL建表语句'.center(99, '-'))
    print(m.get_mysql_ddl(TABLE_SCHEMA, TABLE_NAME))
    print('HIVE建表语句'.center(99, '-'))
    print(m.get_hive_ddl(TABLE_SCHEMA, TABLE_NAME))

附录

数据类型

HIVE数值大小说明范围
TINYINT1-bytesigned integer-128~127
SMALLINT2-bytesigned integer-32,768~32,767
INT/INTEGER4-bytesigned integer-2,147,483,648~2,147,483,647
BIGINT8-bytesigned integer-9,223,372,036,854,775,808~
9,223,372,036,854,775,807
FLOAT4-byte单精度浮点数
DOUBLE8-byte双精度浮点数
DECIMALDECIMAL(precision, scale)
默认decimal(10,0)
NUMERIC等同DECIMAL
始于Hive 3.0.0
HIVE时间译名示例
TIMESTAMP时间戳SELECT CURRENT_TIMESTAMP()=>2022-04-22 00:15:29.363
DATE日期SELECT CURRENT_DATE()=>2022-04-22
INTERVAL时间间隔SELECT INTERVAL '1' DAY=>1 00:00:00.000000000
HIVE字符串译名范围备注
String字符串无长度限制不能太长,太长会影响性能
Varcharvariable character,可变字符1~65535超出长度部分会被截断
Char定长字符1~255不足长度部分将会用空格填充
MySQL数值大小范围(有符号)范围(无符号)用途
TINYINT1 Bytes(-128,127)(0,255)小整数值
SMALLINT2 Bytes(-32 768,32 767)(0,65 535)大整数值
MEDIUMINT3 Bytes(-8 388 608,8 388 607)(0,16 777 215)大整数值
INT或
INTEGER
4 Bytes(-2 147 483 648,2 147 483 647)(0,4 294 967 295)大整数值
BIGINT8 Bytes(-9,223,372,036,854,775,808,9 223 372 036 854 775 807)(0,18 446 744 073 709 551 615)极大整数值
FLOAT4 Bytes(-3.402 823 466 E+38,-1.175 494 351 E-38),
0,
(1.175 494 351 E-38,3.402 823 466 351 E+38)
0,(1.175 494 351 E-38,3.402 823 466 E+38)单精度
浮点数值
DOUBLE8 Bytes(-1.797 693 134 862 315 7 E+308,-2.225 073 858 507 201 4 E-308),
0,
(2.225 073 858 507 201 4 E-308,1.797 693 134 862 315 7 E+308)
0,(2.225 073 858 507 201 4 E-308,1.797 693 134 862 315 7 E+308)双精度
浮点数值
DECIMALDECIMAL(M,D)
若M>D,则M+2,否则D+2
依赖于M和D的值依赖于M和D的值小数值
MySQL字符串大小用途
CHAR0-255 bytes定长字符串
VARCHAR0-65535 bytes变长字符串
TINYBLOB0-255 bytes不超过 255 个字符的二进制字符串
TINYTEXT0-255 bytes短文本字符串
BLOB0-65 535 bytes二进制形式的长文本数据
TEXT0-65 535 bytes长文本数据
MEDIUMBLOB0-16 777 215 bytes二进制形式的中等长度文本数据
MEDIUMTEXT0-16 777 215 bytes中等长度文本数据
LONGBLOB0-4 294 967 295 bytes二进制形式的极大文本数据
LONGTEXT0-4 294 967 295 bytes极大文本数据
MySQL时间大小范围
date3 bytes1000-01-01/9999-12-31
datetime8 bytes1000-01-01 00:00:00 ~ 9999-12-31 23:59:59
timestamp4 bytes1970-01-01 00:00:00 ~ 2038-01-19 03:14:07

某次执行结果打印

---------------------------------------------源MySQL建表语句--------------------------------------------
CREATE TABLE `yyy_inf` (
  `id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
  `v01` tinyint NOT NULL DEFAULT '-12' COMMENT '值2',
  `v02` tinyint unsigned NOT NULL DEFAULT '12' COMMENT '值2',
  `v011` smallint NOT NULL DEFAULT '-12' COMMENT '值2',
  `v012` smallint unsigned NOT NULL DEFAULT '12' COMMENT '值2',
  `v0123` mediumint unsigned NOT NULL DEFAULT '12' COMMENT '值2',
  `v1` int unsigned NOT NULL DEFAULT '1234' COMMENT '值1',
  `v2` int NOT NULL DEFAULT '-12' COMMENT '值2',
  `v3` bigint NOT NULL DEFAULT '-1234' COMMENT '值3',
  `v4` decimal(9,2) NOT NULL DEFAULT '-12.34' COMMENT '值4',
  `v41` decimal(10,0) NOT NULL COMMENT '值4',
  `v5` decimal(9,2) unsigned NOT NULL DEFAULT '12.34' COMMENT '值5',
  `v6` float NOT NULL DEFAULT '-12.34' COMMENT '值6',
  `v7` float unsigned NOT NULL DEFAULT '12.34' COMMENT '值7',
  `v8` double unsigned NOT NULL DEFAULT '12.34' COMMENT '值8',
  `v9` double NOT NULL DEFAULT '-12.34' COMMENT '值9',
  `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
  `delete_flag` tinyint unsigned NOT NULL DEFAULT '0' COMMENT '逻辑删除标识:1=删除,0=未删',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='y信息'
----------------------------------------------HIVE建表语句---------------------------------------------
CREATE EXTERNAL TABLE `ods_mysql_yyy_inf_full`(
  `id` BIGINT COMMENT '主键',
  `v01` TINYINT COMMENT '值2',
  `v02` SMALLINT COMMENT '值2',
  `v011` SMALLINT COMMENT '值2',
  `v012` INT COMMENT '值2',
  `v0123` INT COMMENT '值2',
  `v1` BIGINT COMMENT '值1',
  `v2` INT COMMENT '值2',
  `v3` BIGINT COMMENT '值3',
  `v4` DECIMAL(9,2) COMMENT '值4',
  `v41` DECIMAL(10,0) COMMENT '值4',
  `v5` DECIMAL(9,2) COMMENT '值5',
  `v6` FLOAT COMMENT '值6',
  `v7` FLOAT COMMENT '值7',
  `v8` DOUBLE COMMENT '值8',
  `v9` DOUBLE COMMENT '值9',
  `create_time` STRING COMMENT '创建时间',
  `update_time` STRING COMMENT '修改时间',
  `delete_flag` SMALLINT COMMENT '逻辑删除标识:1=删除,0=未删'
) COMMENT 'y信息'
PARTITIONED BY (`ymd` STRING);

补充:ADS层数据导出到MySQL,自动生成HIVE表对应MySQL的建表语句

"""
通过ADS层建表语句,生成MySQL建表语句
ADS层不分区,不使用压缩,行存储
"""
from sql import read_file
text = read_file('hive/ads.sql')
# 去掉EXTERNAL
text = text.replace('EXTERNAL', '')
# STRING替换为varchar
text = text.replace('STRING', 'text')
# 去掉ads_前缀
text = text.replace('ads_', '')
print(text)
-- 源HIVE建表语句
CREATE EXTERNAL TABLE ads_purchase_order_info (
`prch_order_id` STRING COMMENT '采购订单头id',
`prch_item_id` STRING COMMENT '采购订单行id',
`exfactory_total_price` DOUBLE COMMENT '出厂价总额',
`insert_time` STRING COMMENT '数据插入日期'
) COMMENT '采购信息';

-- 生成的MySQL建表语句
CREATE  TABLE purchase_order_info (
`prch_order_id` text COMMENT '采购订单头id',
`prch_item_id` text COMMENT '采购订单行id',
`exfactory_total_price` DOUBLE COMMENT '出厂价总额',
`insert_time` text COMMENT '数据插入日期'
) COMMENT '采购信息';

以上是关于MySQL数据同步到HIVE,自动生成MySQL表对应HIVE的建表语句的主要内容,如果未能解决你的问题,请参考以下文章

sqoop从hive导入数据到mysql时出现主键冲突

大数据NiFi(二十):实时同步MySQL数据到Hive

Mysql 上亿级数据导入Hive思路分享

mysql同步数据到hive---binlog方式

怎样用sqoop把navicat for mysql 里的表导入到hive中

在idea上链接hive 并将mysql上的数据抽取到hive表中