HIVE_SQL转SparkSQL 简单示例
Posted 小说推荐与点评
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了HIVE_SQL转SparkSQL 简单示例相关的知识,希望对你有一定的参考价值。
因为任务需要,写了个hivesql脚本处理数据,合作方觉得hive不行,需要转为spark,开始是直接转,发现有问题,经过两次调试,完成转换。
纯工程代码,没有原理介绍,没有参数调优,参数均采用hivesql参数。
1、原hivesql代码:
#!/usr/bin/env python
# -*-coding:utf-8 -*-
#insert_tables.py
#*******************************************************************************************
# ** 文件名称:insert_tables.py
# ** 功能描述:
# **
# ** 特殊说明:
# ** 输入表: bx_mgsx_i2_h bx_mgsx_i3_h i5_h
# **
# ** 调用格式:
# ** 按需:python insert_tables.py day_id SOURCE_HIVE_DATABASE TARGET_HIVE_DATABASE
# ** 创建者: yangmh
import os,sys
import datetime
import time
name = sys.argv[0][sys.argv[0].rfind(os.sep)+1:].rstrip('.py')
day_id = sys.argv[1]
SOURCE_HIVE_DATABASE = sys.argv[2]
TARGET_HIVE_DATABASE = sys.argv[3]
#设置PYTHONPATH
current_path=os.path.dirname(os.path.abspath(__file__))
HIVE_DATABASE='yangmh'
hiveCmd='hive -e "'+'use '+ TARGET_HIVE_DATABASE + ';'
#source_table
source_i5_h=SOURCE_HIVE_DATABASE+".i5_h"
#des_table
des_i5_h_host_count_hour="i5_h_host_times"
###############################################################################
#创建表
#i5_h_host_count
##############################################################################
def CreateLogTable():
Createtables_des_i5_h_host_count_hour="create table if not exists "+des_i5_h_host_count_hour+" (user_id string,host_pv bigint,host_uv bigint) PARTITIONED BY ( hour_list string,day_id string )row format delimited fields terminated by '\\t'"
os.system(hiveCmd+Createtables_des_i5_h_host_count_hour+';"')
###############################################################################
# 数据添加到分区表
##############################################################################
def LoadDataToTable(day_id):
LoadDataSql=" set hive.exec.dynamic.partition=true;\
set hive.exec.dynamic.partition.mode=nonstrict;\
set hive.exec.max.dynamic.partitions=5000; \
from (select user_id,domain_first_lvl, \
substr(p_hour,9,2) as hour_list,substr(p_hour,0,8) day_id \
,count(1) host_pv from "+source_i5_h+" where substr(p_hour,0,8)='"+day_id+"'\
group by user_id,domain_first_lvl,substr(p_hour,9,2) ,substr(p_hour,0,8))t \
insert overwrite table "+des_i5_h_host_count_hour+" partition(hour_list,day_id ) \
select user_id ,sum(host_pv) host_pv,count(1) host_uv,hour_list,day_id group by user_id,hour_list,day_id \
;"
print hiveCmd+LoadDataSql+'"'
os.system(hiveCmd+LoadDataSql+'"')
#程序开始执行
try:
#DropLogTable()
CreateLogTable()
LoadDataToTable(day_id)
except Exception,e:
print e
2、第一次改进后代码:(本地测试没问题,但是到客户方后,发现会试图读所有数据库,导致报错)
#!/usr/bin/env python
# -*-coding:utf-8 -*-
#insert_tables_spark.py
import os,sys
import datetime
import time
name = sys.argv[0][sys.argv[0].rfind(os.sep)+1:].rstrip('.py')
hour_id = sys.argv[1]
SOURCE_HIVE_DATABASE = sys.argv[2]
TARGET_HIVE_DATABASE = sys.argv[3]
#设置PYTHONPATH
#current_path=os.path.dirname(os.path.abspath(__file__))
hiveCmd='spark-sql \
--master yarn \
--deploy-mode client \
--driver-memory 4G \
--executor-memory 4G \
--num-executors 10 \
-e "'+'use '+ TARGET_HIVE_DATABASE + ';'
#source_table
source_i5_h=SOURCE_HIVE_DATABASE+".i5_h"
#des_table
des_i5_h_host_count_hour_hour="i5_h_host_times_hour"
###############################################################################
#创建表#
#i5_h_host_count
##############################################################################
def CreateLogHourTable():
Createtables_des_i5_h_host_count_hour="create table if not exists "+des_i5_h_host_count_hour_hour+" (user_id string,domain_first_lvl string,host_pv bigint) PARTITIONED BY (day_id string, hour_id string )row format delimited fields terminated by '\\t' LOCATION 'hdfs://cluster/xxxx/xxx_hive_db/"+des_i5_h_host_count_hour_hour+"';"
os.system(hiveCmd+Createtables_des_i5_h_host_count_hour+';"')
###############################################################################
# 数据添加到分区表
##############################################################################
def LoadHourDataToTable(hour_id):
LoadHourDataSql=" set hive.exec.dynamic.partition=true;\
set hive.exec.dynamic.partition.mode=nonstrict;\
set hive.exec.max.dynamic.partitions=5000; \
insert overwrite table "+des_i5_h_host_count_hour_hour+" partition(day_id,hour_id ) \
select user_id,domain_first_lvl,count(1) host_pv ,substr(p_hour,0,8) day_id,p_hour as hour_id\
from "+source_i5_h+" where substr(p_hour,0,10)='"+hour_id+"'\
group by user_id,domain_first_lvl,p_hour,substr(p_hour,0,8);"
print hiveCmd+LoadHourDataSql+'"'
os.system(hiveCmd+LoadHourDataSql+'"')
try:
CreateLogHourTable()
LoadHourDataToTable(hour_id)
except Exception,e:
print e
3、第二次改进后代码:
#!/usr/bin/env python
# -*-coding:utf-8 -*-
#insert_tables_spark.py
#*******************************************************************************************
# ** 文件名称:insert_tables_spark.py
# ** 功能描述:
# **
# ** 特殊说明:
# ** 输入表: source_h
# ** 输出表:source_h_host_times_hour
# ** 调用格式:
# ** 按需:spark-submit --master yarn --deploy-mode client --driver-memory 2G --executor-memory 2G --num-executors 100 insert_tables_spark.py hour_id SOURCE_HIVE_DATABASE TARGET_HIVE_DATABASE
# ** 输出表:
# **
# ** 创建者: yangmh
from pyspark.sql import SparkSession, SQLContext
import os
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
spark = SparkSession.builder.enableHiveSupport().getOrCreate()
name = sys.argv[0][sys.argv[0].rfind(os.sep)+1:].rstrip('.py')
hour_id = sys.argv[1]
SOURCE_HIVE_DATABASE = sys.argv[2]
TARGET_HIVE_DATABASE = sys.argv[3]
#source_table
source_i5_h=SOURCE_HIVE_DATABASE+".source_h"
#des_table
des_i5_h_host_count_hour="source_h_host_times_hour"
spark.sql("set hive.exec.dynamic.partition=true")
spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
spark.sql("set hive.exec.max.dynamic.partitions=5000")
spark.sql("use "+TARGET_HIVE_DATABASE)
create_table_sql="create table if not exists "+des_i5_h_host_count_hour+" (user_id string,domain_first_lvl string,host_pv bigint) PARTITIONED BY (day_id string, hour_id string )row format delimited fields terminated by '\\t' LOCATION 'hdfs://cluster/xxxx/xxx_hive_db/"+des_i5_h_host_count_hour+"'"
print create_table_sql
LoadHourDataSql=" insert overwrite table "+des_i5_h_host_count_hour+" partition(day_id,hour_id ) \
select user_id,domain_first_lvl,count(1) host_pv ,substr(p_hour,0,8) day_id,p_hour as hour_id\
from "+source_i5_h+" where substr(p_hour,0,10)='"+hour_id+"'\
group by user_id,domain_first_lvl,p_hour,substr(p_hour,0,8)"
print LoadHourDataSql
spark.sql(create_table_sql)
spark.sql(LoadHourDataSql)
以上是关于HIVE_SQL转SparkSQL 简单示例的主要内容,如果未能解决你的问题,请参考以下文章
oracleSQL 转 SPARKSQL(hiveSql) 及常用优化