执行HIVE通用脚本Python实现

Posted 小基基o_O

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了执行HIVE通用脚本Python实现相关的知识,希望对你有一定的参考价值。

场景

  • HIVE离线计算(T+1)

用法

  1. 先把SQL写好
  2. 再把SQL复制到脚本中的sql
  3. 如果有动态日期,就要将动态日期换成{ymd},例如:将'2021-10-24'换成'{ymd}'
  4. 动态日期默认昨天,可通过传参来修改

创建可执行脚本

touch hive_e.py
chmod 777 hive_e.py
vim hive_e.py

通用脚本Python语言实现

#!/usr/bin/python
# coding:utf-8
import os
import platform
from sys import argv
from time import time, strftime
from datetime import date, timedelta


class Executor:
    def __init__(self):
        self.t = time()  # 起始秒数
        self.system = platform.system()  # 操作系统

    def __del__(self):
        if self.system == 'Windows':
            print('结束时间:' + self.dark_cyan(self.now()))
            print('运行时间:' + self.dark_cyan(self))
        else:
            print('END TIME: {}'.format(self.now()))
            print('RUN TIME: {}'.format(self))

    def __str__(self):
        t = self.second
        if t < 60:
            return '%.2fs' % t
        elif t < 3600:
            return '%.2fmin' % (t / 60)
        else:
            return '%.2fh' % (t / 3600)

    @property
    def second(self):
        """程序运行秒数"""
        return time() - self.t

    @staticmethod
    def yellow(text):
        print('\\033[93m{}\\033[0m'.format(text))

    @staticmethod
    def dark_cyan(text):
        return '\\033[36m{}\\033[0m'.format(text)

    @staticmethod
    def now():
        return strftime('%Y-%m-%d %H:%M:%S')

    @staticmethod
    def yesterday():
        yesterday = date.today() - timedelta(days=1)
        return yesterday.strftime('%Y-%m-%d')

    def debug(self, text):
        """仅在Windows上打印"""
        if self.system == 'Windows':
            print('DEBUG', self.dark_cyan(self.now()))
            self.yellow(text)

    def info(self, text):
        if self.system == 'Windows':
            print('INFO {}'.format(self.dark_cyan(self.now())))
            self.yellow(text)
        else:
            print('INFO {}'.format(self.now()))
            print(text)

    def get_parameter_ymd(self):
        """获取日期参数,默认昨天(T+1)"""
        self.debug(argv)
        if len(argv) == 2:
            return argv[1]
        else:
            return self.yesterday()

    def execute(self, cmd):
        """在Windows上打印,在Linux上执行"""
        self.info(cmd)
        if self.system == 'Linux':
            os.system(cmd)


e = Executor()

# 默认配置
QUEUE = '队列名'  # 队列名
JOB = '任务名'  # 脚本中的SQL生成的job会用同一个名字
DATABASE = '库名'  # 库名
THREAD_NUMBER = 4  # 并行度

# 动态日期参数
ymd = e.get_parameter_ymd()

# 设置
set_queue = 'set mapred.job.queue.name={};'.format(QUEUE)
set_job = 'set mapred.job.name={};'.format(JOB)
set_parallel = '''
set hive.exec.parallel=true;
set hive.exec.parallel.thread.number={};
'''.format(THREAD_NUMBER).strip()
use_database = 'use {};'.format(DATABASE)
set_dynamic_partition = '''
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
'''.strip()

# SQL
sql = '''


'''.format(ymd=ymd).strip()
e.debug(sql)

# 执行HIVE的完整命令
hive_e = '''
hive -e "
{set_queue}
{set_job}
{set_parallel}
{set_dynamic_partition}
{use_database}
{sql}
"
'''.format(
    set_queue=set_queue,
    set_job=set_job,
    use_database=use_database,
    set_parallel=set_parallel,
    set_dynamic_partition=set_dynamic_partition,
    sql=sql
).strip()

# 执行
e.execute(hive_e)

执行语法

./hive_e.py
./hive_e.py 2021-10-24

以上是关于执行HIVE通用脚本Python实现的主要内容,如果未能解决你的问题,请参考以下文章

Eclipse 中的通用代码片段或模板

用Python实现邮件发送Hive明细数据

常用python日期日志获取内容循环的代码片段

shell 脚本执行日志通用模块

如何找到Hive提交的SQL相对应的Yarn程序的applicationId

Android 逆向Android 逆向通用工具开发 ( Android 平台运行的 cmd 程序类型 | Android 平台运行的 cmd 程序编译选项 | 编译 cmd 可执行程序 )(代码片段