运行Sqoop任务的通用脚本Python2实现(待优化)
Posted 小基基o_O
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了运行Sqoop任务的通用脚本Python2实现(待优化)相关的知识,希望对你有一定的参考价值。
本想用Bash
?结果被Shell的正则表达式恶心到了!还是Python
好
思想
把【配置、sqoop、mysql】三者分离
mysql_to_hdfs.py
用法:把一大堆SQL写进
_queries
,SQL之间用双换行分隔,日期用ymd
注意:SQL的日期要用双引号,SQL注释要顶格
#!/usr/bin/python2
"""
Usage
mysql_to_hdfs.py [ymd]
e.g.
mysql_to_hdfs.py
mysql_to_hdfs.py 2021-01-01
"""
import os, sys, datetime, re
# constant
IP_ADDR = 'hadoop100'
BASENAME = 'b0'
USERNAME = 'root'
PASSWORD = '123456'
HDFS = '/h0/%s' % BASENAME
QUEUE_NAME = ''
JOB_NAME = ''
# yesterday
yesterday = datetime.date.today() - datetime.timedelta(days=1)
# command line parameters
a = sys.argv
if len(a) > 1:
ymd = datetime.datetime.strptime(a[1], '%Y-%m-%d').strftime('%Y-%m-%d')
else:
ymd = yesterday.strftime('%Y-%m-%d')
def os_system(s):
if os.name == 'posix':
os.system(' '.join(s.split()))
else:
print(' '.join(s.split()))
def remove_mysql_comments(mysql):
mysql = re.sub('^#.+', '', mysql)
mysql = re.sub('^-- .+', '', mysql)
mysql = re.sub(r'^/\\*[\\s\\S]*\\*/', '', mysql)
return ' '.join(mysql.split('\\n'))
def get_tb_from_sql(query):
tb = re.findall(r'FROM\\s+([a-z_0-9]+)\\s+', query, re.I)
return tb[0]
def sqoop_import(query):
query = remove_mysql_comments(query)
tb_name = get_tb_from_sql(query)
hdfs = '%s/%s/%s' % (HDFS, tb_name, ymd)
return r'''
sqoop import
-D mapred.job.queue.name=queue_name
-D mapred.job.name=job_name
--connect jdbc:mysql://ip_addr:3306/basename
--username username --password password
--delete-target-dir --target-dir hdfs
--query 'query AND $CONDITIONS'
--null-string '\\\\N' --null-non-string '\\\\N'
--num-mappers 1
'''.format(
queue_name=QUEUE_NAME,
job_name=JOB_NAME,
ip_addr=IP_ADDR,
basename=BASENAME,
username=USERNAME,
password=PASSWORD,
hdfs=hdfs,
query=query,
)
def sqoop_imports(queries):
queries = re.split('\\n2,', queries.strip())
for query in queries:
if 'WHERE' not in query:
query += ' WHERE 1=1 '
os_system(sqoop_import(query))
_queries = '''
#full
SELECT * FROM t1
-- append
SELECT * FROM t2 WHERE DATE_FORMAT(create_time,"%Y-%m-%d")="ymd"
/*
append
amend
*/
SELECT * FROM t3
WHERE (DATE_FORMAT(create_time,"%Y-%m-%d")="ymd"
OR DATE_FORMAT(update_time,"%Y-%m-%d")="ymd")
'''.format(ymd=ymd)
sqoop_imports(_queries)
ads_to_mysql.py
用法:把一大堆【表名和作为更新标识的列名】写进
_lines
,用换行分隔
#!/usr/bin/python2
import os
# constant
IP_ADDR = 'hadoop100'
BASENAME = 'b0'
USERNAME = 'root'
PASSWORD = '123456'
HDFS = '/%s/ads_' % BASENAME
QUEUE_NAME = ''
JOB_NAME = ''
def os_system(s):
if os.name == 'posix':
os.system(' '.join(s.split()))
else:
print(' '.join(s.split()))
def sqoop_export(tb_name, update_key=''):
hdfs = HDFS + tb_name
if update_key:
update_key = '--update-mode allowinsert --update-key ' + update_key
return r'''
sqoop export
-D mapred.job.queue.name=queue_name
-D mapred.job.name=job_name
--connect jdbc:mysql://ip_addr:3306/basename
--username username --password password
--export-dir hdfs
--table tb_name
update_key
--input-null-string '\\\\N' --input-null-non-string '\\\\N'
--num-mappers 1
'''.format(
queue_name=QUEUE_NAME,
job_name=JOB_NAME,
ip_addr=IP_ADDR,
basename=BASENAME,
username=USERNAME,
password=PASSWORD,
hdfs=hdfs,
tb_name=tb_name,
update_key=update_key
)
def main(lines):
for line in lines.strip().split('\\n'):
os_system(sqoop_export(*line.strip().split()))
_lines = '''
t1
t2 create_time
t3 create_time,update_time
'''
main(_lines)
待优化
开发、测试、生产 环境 自动切换
https://yellow520.blog.csdn.net/article/details/121105488
待添加的参数
Sqoop的底层运行任务是MR中Map,默认是4个MapTask
此时涉及到MapTask在YARN上的队列名和作业名
-D mapred.job.queue.name
-D mapred.job.name
压缩
--compress
--compression-codec lzop
待添加的模式
--hive-import
--hive-import
和先import到HDFS再LOAD到HIVE
有啥区别?
--hive-import
更直接,代码更少,支持压缩,除此之外:更快?支持并行?支持列式存储?
安全问题
通常密码这种东西不应该明文写到脚本
优化方案:将密码等配置写入文件,并上传到HDFS,脚本通过读取配置来获取密码
以上是关于运行Sqoop任务的通用脚本Python2实现(待优化)的主要内容,如果未能解决你的问题,请参考以下文章
为啥我在 Azkaban 中的 Sqoop 任务在选择列后卡住了?