批量查询HIVE所有表的大小和行数
Posted 小基基o_O
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了批量查询HIVE所有表的大小和行数相关的知识,希望对你有一定的参考价值。
文章目录
概述
- 需求:
1、批量查询HIVE表的大小和行数
2、批量查询HIVE表文件数和分区数 - 思路
借助程序(本文为Python2)遍历HIVE表,DESC FORMATTED
获取表状态信息 - 我们往往不需要所有表,而是指定的表,筛选方法:
1、制定表名规则,USE 库
+SHOW TABLES
+LIKE
进行筛选
2、在mysql(HIVE元数据存储位置)进行筛选
批量获取HIVE表基本信息
SELECT
b.`NAME`, -- 库名
t.`TBL_NAME`, -- 表名
t.`TBL_TYPE`, -- 表类型
c.`PARAM_VALUE`, -- 表注释
s.`LOCATION` -- 表的HDFS路径
FROM `DBS`b
INNER JOIN `TBLS`t ON t.`DB_ID`=b.`DB_ID`
INNER JOIN `SDS`s ON s.`SD_ID`=t.`SD_ID`
LEFT JOIN (
SELECT `TBL_ID`,`PARAM_VALUE`
FROM `TABLE_PARAMS`
WHERE `PARAM_KEY`='comment'
)c ON c.`TBL_ID`=t.`TBL_ID`
WHERE t.`TBL_TYPE`='MANAGED_TABLE' OR t.`TBL_TYPE`='EXTERNAL_TABLE';
详细信息
SELECT
b.`NAME` AS `库名`
,t.`TBL_NAME` AS `表名`
,t.`TBL_TYPE` AS `表类型`
,c.`PARAM_VALUE` AS `表注释`
,s.`LOCATION` AS `表的HDFS路径`
,t.`VIEW_EXPANDED_TEXT` AS `视图的SQL`
,t.`OWNER` AS `表的属主`
,t.`OWNER_TYPE` AS `表的属主的类型`
,s.`INPUT_FORMAT` AS `表输入格式`
,s.`OUTPUT_FORMAT` AS `表输出格式`
-- ,s.`NUM_BUCKETS` AS `分桶数`
-- ,s.`IS_COMPRESSED` AS 是否压缩
FROM `DBS`b
INNER JOIN `TBLS`t ON t.`DB_ID`=b.`DB_ID`
INNER JOIN `SDS`s ON s.`SD_ID`=t.`SD_ID`
LEFT JOIN (
SELECT `TBL_ID`,`PARAM_VALUE`
FROM `TABLE_PARAMS`
WHERE `PARAM_KEY`='comment'
)c ON c.`TBL_ID`=t.`TBL_ID`
WHERE t.`TBL_TYPE`='MANAGED_TABLE' OR t.`TBL_TYPE`='EXTERNAL_TABLE';
查询单个HIVE表的大小和行数
DESC FORMATTED 库名.表名;
代码
MySQL建表,用于存储查询结果
USE data_management;
CREATE TABLE hive_table_status(
update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
table_name VARCHAR(255) NOT NULL COMMENT "HIVE表名.库名",
num_partitions INT DEFAULT NULL COMMENT "分区数",
raw_data_size BIGINT DEFAULT NULL COMMENT "原始数据大小",
total_size BIGINT DEFAULT NULL COMMENT "总大小(不含副本),单位B",
num_rows BIGINT DEFAULT NULL COMMENT "行数",
num_files INT DEFAULT NULL COMMENT "文件数",
PRIMARY KEY(table_name)
)COMMENT "HIVE表状态信息";
Python2
# coding:utf-8
from subprocess import check_output
from time import time, strftime
from re import findall
# HIVE元数据所在的库
META_STORE = 'hive'
# 数据存储路径
MYSQL_DB = 'data_management'
MYSQL_TB = 'hive_table_status'
# 查询HIVE表
SQL = '''
SELECT
b.`NAME` AS db_name, -- 库名
t.`TBL_NAME` AS tb_name -- 表名
FROM `DBS` b
INNER JOIN `TBLS` t ON b.`DB_ID`=t.`DB_ID`
WHERE 1=1
'''
HIVE_DB = '' # 根据库来筛选HIVE表,默认不筛选
EXTERNAL_TABLE = False # 是否筛选外部表
class Executor:
def __init__(self, **kwargs):
# 起始秒数
self.t = time()
# 传参:https://yellow520.blog.csdn.net/article/details/122088401
self.parameters = kwargs
def __del__(self):
print '结束时间:'.format(self.now)
print '运行时长:'.format(self)
def __str__(self):
t = self.second
if t < 3600:
return '%.2fmin' % (t / 60)
else:
return '%.2fh' % (t / 3600)
@property
def second(self):
"""程序运行秒数"""
return time() - self.t
@property
def now(self):
return strftime('%Y-%m-%d %H:%M:%S')
@staticmethod
def sh(cmd):
"""执行Shell命令"""
return check_output(cmd, shell=True).strip()
class Mysql(Executor):
def e(self, sql):
host = self.parameters.get('host', 'localhost')
user = self.parameters.get('user', 'root')
password = self.parameters.get('password')
database = self.parameters.get('database')
sql = sql.replace("'", '"')
return self.sh("mysql -h'' -u'' -p'' -D'' -e''".format(
host, user, password, database, sql))
def fetch_dt(self, sql):
lines = self.e(sql).split('\\n')
if len(lines) > 1:
columns = lines[0].split('\\t')
col_num = len(columns)
for line in lines[1:]:
cells = line.split('\\t')
yield columns[i]: cells[i] for i in range(col_num)
def replace(self, dt, tb):
ls = [(k, v) for k, v in dt.items() if v is not None]
sql = 'REPLACE INTO %s (' % tb + ','.join(i[0] for i in ls) + \\
') VALUES (' + ','.join('%r' % i[1] for i in ls) + ')'
self.e(sql)
class Hive(Executor):
def e(self, sql):
return self.sh("hive -e ''".format(sql))
class Monitor:
def __init__(self, mysql_pwd):
self.mysql_hive_meta_store = Mysql(password=mysql_pwd, database=META_STORE)
self.mysql_db = Mysql(password=mysql_pwd, database=MYSQL_DB)
self.hive = Hive()
self.hive_result = ''
def find_all(self, pattern):
a = findall(pattern, self.hive_result)
if a:
a = a[0]
if a.isdigit():
return int(a)
return a
return None
def select_hive_tables(self):
sql = SQL
if HIVE_DB:
sql += ' AND b.`NAME`="hive_db"'.format(hive_db=HIVE_DB)
if EXTERNAL_TABLE is True:
sql += ' AND `TBL_TYPE`="EXTERNAL_TABLE"'
for dt in self.mysql_hive_meta_store.fetch_dt(sql):
yield '.'.format(dt['db_name'], dt['tb_name'])
def desc_formatted(self, table):
sql = 'DESC FORMATTED ;'.format(table)
self.hive_result = self.hive.e(sql)
return
'table_name': table,
'num_partitions': self.find_all(r'numPartitions\\s+([0-9]+)'),
'raw_data_size': self.find_all(r'rawDataSize\\s+([0-9]+)'),
'total_size': self.find_all(r'totalSize\\s+([0-9]+)'),
'num_rows': self.find_all(r'numRows\\s+([0-9]+)'),
'num_files': self.find_all(r'numFiles\\s+([0-9]+)'),
def main(self):
for tb in self.select_hive_tables():
self.mysql_db.replace(self.desc_formatted(tb), MYSQL_TB)
def main2(self):
ls = []
for tb in self.select_hive_tables():
dt = self.desc_formatted(tb)
# 平均每个文件大小,单位MB
if dt['total_size'] and dt['num_files']:
dt['avg_file_mb'] = 1. * dt['total_size'] / dt['num_files'] / 1024
# 平均每个分区大小,单位MB
if dt['total_size'] and dt['num_partitions']:
dt['avg_partition_mb'] = 1. * dt['total_size'] / dt['num_partitions'] / 1024
# 平均每个分区文件数
if dt['num_files'] and dt['num_partitions']:
dt['avg_partition_num_files'] = 1. * dt['num_files'] / dt['num_partitions']
ls.append(dt)
for dt in ls:
print dt
if __name__ == '__main__':
Monitor(mysql_pwd='密码').main()
结果查询
USE hive;
SELECT
b.`NAME`, -- 库名
t.`TBL_NAME`, -- 表名
t.`TBL_TYPE`, -- 表类型
c.`PARAM_VALUE`, -- 表注释
s.`LOCATION`, -- 表的HDFS路径
d.`update_time`, -- HIVE表最近一次状态的更新时间
d.`num_partitions`, -- 分区数
d.`raw_data_size`, -- 原始数据大小",
d.`total_size`, -- 总大小(不含副本)
d.`num_rows`, -- 行数
d.`num_files`, -- 文件数
d.`total_size`/d.`num_files`/1024/1024 AS `avg_file_mb`, -- 平均每个文件大小,单位MB
d.`total_size`/d.`num_partitions`/1024/1024 AS `avg_partition_mb`, -- 平均每个分区大小,单位MB
d.`num_files`/d.`num_partitions` AS `avg_partition_num_files` -- 平均每个分区文件数
FROM `DBS`b
INNER JOIN `TBLS`t ON t.`DB_ID`=b.`DB_ID`
INNER JOIN `SDS`s ON s.`SD_ID`=t.`SD_ID`
LEFT JOIN (
SELECT `TBL_ID`,`PARAM_VALUE`
FROM `TABLE_PARAMS`
WHERE `PARAM_KEY`='comment'
)c ON c.`TBL_ID`=t.`TBL_ID`
LEFT JOIN data_management.hive_table_status d
ON CONCAT(b.`NAME`,'.',t.`TBL_NAME`)=d.`table_name`;
Appendix
en | 🔉 | cn |
---|---|---|
accurate | ˈækjərət | 准确的 |
SerDe | Serialize Deserialize | 序列化和反序列化 |
transient | ˈtrænʃnt | adj. 转瞬即逝的;暂住的;n. 流动人口 |
compressed | kəmˈprest | adj. (被)压缩的;扁的;v. (被)压紧 |
ORC | Optimized Row Columnar | 基于列式存储的 用于HIVE的 Hadoop生态的 文件存储格式 |
columnar | kəˈlʌmnər | adj. 柱状的;分纵栏印刷或书写的 |
snappy | ˈsnæpi | 简洁的;精练的 |
某次DESC FORMATTED
结果
# col_name data_type comment
user_id string 用户ID
page_id string 页面ID
# Partition Information
# col_name data_type comment
ymd string
# Detailed Table Information
Database: ec
OwnerType: USER
Owner: yellow
CreateTime: Sun Mar 13 11:21:47 CST 2022
LastAccessTime: UNKNOWN
Retention: 0
Location: hdfs://hadoop105:8020/user/hive/warehouse/ec.db/dwd/pv
Table Type: EXTERNAL_TABLE
Table Parameters:
COLUMN_STATS_ACCURATE \\"BASIC_STATS\\":\\"true\\"
EXTERNAL TRUE
bucketing_version 2
comment 页面浏览
numFiles 16
numPartitions 9
numRows 3000
orc.compress snappy
rawDataSize 519000
totalSize 17047
transient_lastDdlTime 1647141707
# Storage Information
SerDe Library: org.apache.hadoop.hive.ql.io.orc.OrcSerde
InputFormat: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
Compressed: No
Num Buckets: -1
Bucket Columns: []
Sort Columns: []
Storage Desc Params:
serialization.format 1
Time taken: 0.184 seconds, Fetched: 40 row(s)
某次计算结果
+------------+----------------+----------------+---------------+------------+----------+-----------+-------------+
| table_name | TBL_TYPE | num_partitions | raw_data_size | total_size | num_rows | num_files | avg_file_kb |
+------------+----------------+----------------+---------------+------------+----------+-----------+-------------+
| ec.ods_pv | EXTERNAL_TABLE | 9 | 0 | 402845 | 0 | 132 | 2.98032818 |
| ec.dwd_pv | EXTERNAL_TABLE | 9 | 519000 | 17047 | 3000 | 16 | 1.04046631 |
查询所有HIVE表分区数据量