执行HIVE通用脚本Python实现
Posted 小基基o_O
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了执行HIVE通用脚本Python实现相关的知识,希望对你有一定的参考价值。
场景
- HIVE离线计算(T+1)
用法
- 先把SQL写好
- 再把SQL复制到脚本中的
sql
- 如果有动态日期,就要将动态日期换成
{ymd}
,例如:将'2021-10-24'
换成'{ymd}'
- 动态日期默认昨天,可通过传参来修改
创建可执行脚本
touch hive_e.py
chmod 777 hive_e.py
vim hive_e.py
通用脚本Python语言实现
#!/usr/bin/python
# coding:utf-8
import os
import platform
from sys import argv
from time import time, strftime
from datetime import date, timedelta
class Executor:
def __init__(self):
self.t = time() # 起始秒数
self.system = platform.system() # 操作系统
def __del__(self):
if self.system == 'Windows':
print('结束时间:' + self.dark_cyan(self.now()))
print('运行时间:' + self.dark_cyan(self))
else:
print('END TIME: {}'.format(self.now()))
print('RUN TIME: {}'.format(self))
def __str__(self):
t = self.second
if t < 60:
return '%.2fs' % t
elif t < 3600:
return '%.2fmin' % (t / 60)
else:
return '%.2fh' % (t / 3600)
@property
def second(self):
"""程序运行秒数"""
return time() - self.t
@staticmethod
def yellow(text):
print('\\033[93m{}\\033[0m'.format(text))
@staticmethod
def dark_cyan(text):
return '\\033[36m{}\\033[0m'.format(text)
@staticmethod
def now():
return strftime('%Y-%m-%d %H:%M:%S')
@staticmethod
def yesterday():
yesterday = date.today() - timedelta(days=1)
return yesterday.strftime('%Y-%m-%d')
def debug(self, text):
"""仅在Windows上打印"""
if self.system == 'Windows':
print('DEBUG', self.dark_cyan(self.now()))
self.yellow(text)
def info(self, text):
if self.system == 'Windows':
print('INFO {}'.format(self.dark_cyan(self.now())))
self.yellow(text)
else:
print('INFO {}'.format(self.now()))
print(text)
def get_parameter_ymd(self):
"""获取日期参数,默认昨天(T+1)"""
self.debug(argv)
if len(argv) == 2:
return argv[1]
else:
return self.yesterday()
def execute(self, cmd):
"""在Windows上打印,在Linux上执行"""
self.info(cmd)
if self.system == 'Linux':
os.system(cmd)
e = Executor()
# 默认配置
QUEUE = '队列名' # 队列名
JOB = '任务名' # 脚本中的SQL生成的job会用同一个名字
DATABASE = '库名' # 库名
THREAD_NUMBER = 4 # 并行度
# 动态日期参数
ymd = e.get_parameter_ymd()
# 设置
set_queue = 'set mapred.job.queue.name={};'.format(QUEUE)
set_job = 'set mapred.job.name={};'.format(JOB)
set_parallel = '''
set hive.exec.parallel=true;
set hive.exec.parallel.thread.number={};
'''.format(THREAD_NUMBER).strip()
use_database = 'use {};'.format(DATABASE)
set_dynamic_partition = '''
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
'''.strip()
# SQL
sql = '''
'''.format(ymd=ymd).strip()
e.debug(sql)
# 执行HIVE的完整命令
hive_e = '''
hive -e "
{set_queue}
{set_job}
{set_parallel}
{set_dynamic_partition}
{use_database}
{sql}
"
'''.format(
set_queue=set_queue,
set_job=set_job,
use_database=use_database,
set_parallel=set_parallel,
set_dynamic_partition=set_dynamic_partition,
sql=sql
).strip()
# 执行
e.execute(hive_e)
执行语法
./hive_e.py
./hive_e.py 2021-10-24
以上是关于执行HIVE通用脚本Python实现的主要内容,如果未能解决你的问题,请参考以下文章
如何找到Hive提交的SQL相对应的Yarn程序的applicationId
Android 逆向Android 逆向通用工具开发 ( Android 平台运行的 cmd 程序类型 | Android 平台运行的 cmd 程序编译选项 | 编译 cmd 可执行程序 )(代码片段