运行Sqoop任务的通用脚本Python2实现(待优化)

Posted 小基基o_O

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了运行Sqoop任务的通用脚本Python2实现(待优化)相关的知识,希望对你有一定的参考价值。

本想用Bash?结果被Shell的正则表达式恶心到了!还是Python

思想

把【配置、sqoop、mysql】三者分离

mysql_to_hdfs.py

用法:把一大堆SQL写进_queries,SQL之间用双换行分隔,日期用ymd
注意:SQL的日期要用双引号,SQL注释要顶格

#!/usr/bin/python2
"""
Usage
    mysql_to_hdfs.py [ymd]
e.g.
    mysql_to_hdfs.py
    mysql_to_hdfs.py 2021-01-01
"""
import os, sys, datetime, re

# constant
IP_ADDR = 'hadoop100'
BASENAME = 'b0'
USERNAME = 'root'
PASSWORD = '123456'
HDFS = '/h0/%s' % BASENAME
QUEUE_NAME = ''
JOB_NAME = ''

# yesterday
yesterday = datetime.date.today() - datetime.timedelta(days=1)

# command line parameters
a = sys.argv
if len(a) > 1:
    ymd = datetime.datetime.strptime(a[1], '%Y-%m-%d').strftime('%Y-%m-%d')
else:
    ymd = yesterday.strftime('%Y-%m-%d')


def os_system(s):
    if os.name == 'posix':
        os.system(' '.join(s.split()))
    else:
        print(' '.join(s.split()))


def remove_mysql_comments(mysql):
    mysql = re.sub('^#.+', '', mysql)
    mysql = re.sub('^-- .+', '', mysql)
    mysql = re.sub(r'^/\\*[\\s\\S]*\\*/', '', mysql)
    return ' '.join(mysql.split('\\n'))


def get_tb_from_sql(query):
    tb = re.findall(r'FROM\\s+([a-z_0-9]+)\\s+', query, re.I)
    return tb[0]


def sqoop_import(query):
    query = remove_mysql_comments(query)
    tb_name = get_tb_from_sql(query)
    hdfs = '%s/%s/%s' % (HDFS, tb_name, ymd)
    return r'''
    sqoop import
    -D mapred.job.queue.name=queue_name
    -D mapred.job.name=job_name
    --connect jdbc:mysql://ip_addr:3306/basename
    --username username --password password
    --delete-target-dir --target-dir hdfs
    --query 'query AND $CONDITIONS'
    --null-string '\\\\N' --null-non-string '\\\\N'
    --num-mappers 1
    '''.format(
        queue_name=QUEUE_NAME,
        job_name=JOB_NAME,
        ip_addr=IP_ADDR,
        basename=BASENAME,
        username=USERNAME,
        password=PASSWORD,
        hdfs=hdfs,
        query=query,
    )


def sqoop_imports(queries):
    queries = re.split('\\n2,', queries.strip())
    for query in queries:
        if 'WHERE' not in query:
            query += ' WHERE 1=1 '
        os_system(sqoop_import(query))


_queries = '''
#full
SELECT * FROM t1

-- append
SELECT * FROM t2 WHERE DATE_FORMAT(create_time,"%Y-%m-%d")="ymd"

/*
append
amend
*/
SELECT * FROM t3
WHERE (DATE_FORMAT(create_time,"%Y-%m-%d")="ymd"
OR DATE_FORMAT(update_time,"%Y-%m-%d")="ymd")
'''.format(ymd=ymd)
sqoop_imports(_queries)

ads_to_mysql.py

用法:把一大堆【表名作为更新标识的列名】写进_lines,用换行分隔

#!/usr/bin/python2
import os

# constant
IP_ADDR = 'hadoop100'
BASENAME = 'b0'
USERNAME = 'root'
PASSWORD = '123456'
HDFS = '/%s/ads_' % BASENAME
QUEUE_NAME = ''
JOB_NAME = ''


def os_system(s):
    if os.name == 'posix':
        os.system(' '.join(s.split()))
    else:
        print(' '.join(s.split()))


def sqoop_export(tb_name, update_key=''):
    hdfs = HDFS + tb_name
    if update_key:
        update_key = '--update-mode allowinsert --update-key ' + update_key
    return r'''
    sqoop export
    -D mapred.job.queue.name=queue_name
    -D mapred.job.name=job_name
    --connect jdbc:mysql://ip_addr:3306/basename
    --username username --password password
    --export-dir hdfs
    --table tb_name
    update_key
    --input-null-string '\\\\N' --input-null-non-string '\\\\N'
    --num-mappers 1
    '''.format(
        queue_name=QUEUE_NAME,
        job_name=JOB_NAME,
        ip_addr=IP_ADDR,
        basename=BASENAME,
        username=USERNAME,
        password=PASSWORD,
        hdfs=hdfs,
        tb_name=tb_name,
        update_key=update_key
    )


def main(lines):
    for line in lines.strip().split('\\n'):
        os_system(sqoop_export(*line.strip().split()))


_lines = '''
t1
t2 create_time
t3 create_time,update_time
'''
main(_lines)

待优化

开发、测试、生产 环境 自动切换

https://yellow520.blog.csdn.net/article/details/121105488

待添加的参数

Sqoop的底层运行任务是MR中Map,默认是4个MapTask
此时涉及到MapTask在YARN上的队列名和作业名

-D mapred.job.queue.name
-D mapred.job.name

压缩

--compress
--compression-codec lzop

待添加的模式

--hive-import

--hive-import先import到HDFS再LOAD到HIVE有啥区别?
--hive-import更直接,代码更少,支持压缩,除此之外:更快?支持并行?支持列式存储?

安全问题

通常密码这种东西不应该明文写到脚本
优化方案:将密码等配置写入文件,并上传到HDFS,脚本通过读取配置来获取密码

以上是关于运行Sqoop任务的通用脚本Python2实现(待优化)的主要内容,如果未能解决你的问题,请参考以下文章

《懒人Shell脚本》之六——一键构造待采集批量sql语句

为啥我在 Azkaban 中的 Sqoop 任务在选择列后卡住了?

Oozie sqoop 任务错误:无法运行程序“hive”:错误=2 没有这样的文件或目录

sqoop安装及使用

shell命令-e未找到命令

监控HDFS每天数据增量(Python2实现)