HIVE_SQL转SparkSQL 简单示例

Posted 小说推荐与点评

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了HIVE_SQL转SparkSQL 简单示例相关的知识,希望对你有一定的参考价值。

因为任务需要,写了个hivesql脚本处理数据,合作方觉得hive不行,需要转为spark,开始是直接转,发现有问题,经过两次调试,完成转换。

纯工程代码,没有原理介绍,没有参数调优,参数均采用hivesql参数。



1、原hivesql代码:

#!/usr/bin/env python

# -*-coding:utf-8 -*-

#insert_tables.py

#*******************************************************************************************

# ** 文件名称:insert_tables.py

# ** 功能描述:

# ** 

# ** 特殊说明:  

# ** 输入表: bx_mgsx_i2_h  bx_mgsx_i3_h  i5_h

# ** 

# ** 调用格式:  

# **            按需:python insert_tables.py  day_id SOURCE_HIVE_DATABASE TARGET_HIVE_DATABASE

# ** 创建者: yangmh


import os,sys

import datetime

import time

name = sys.argv[0][sys.argv[0].rfind(os.sep)+1:].rstrip('.py')

day_id = sys.argv[1] 

SOURCE_HIVE_DATABASE = sys.argv[2]

TARGET_HIVE_DATABASE = sys.argv[3] 


#设置PYTHONPATH

current_path=os.path.dirname(os.path.abspath(__file__))

HIVE_DATABASE='yangmh'

hiveCmd='hive -e "'+'use '+ TARGET_HIVE_DATABASE + ';'

#source_table  

source_i5_h=SOURCE_HIVE_DATABASE+".i5_h" 

#des_table

des_i5_h_host_count_hour="i5_h_host_times"

###############################################################################

#创建表 

 

#i5_h_host_count

############################################################################## 

 

def CreateLogTable():

     Createtables_des_i5_h_host_count_hour="create table if not exists "+des_i5_h_host_count_hour+" (user_id string,host_pv bigint,host_uv bigint) PARTITIONED BY ( hour_list string,day_id string )row format delimited fields terminated by '\\t'"

    os.system(hiveCmd+Createtables_des_i5_h_host_count_hour+';"') 


###############################################################################

# 数据添加到分区表

############################################################################## 

def LoadDataToTable(day_id): 


    LoadDataSql=" set hive.exec.dynamic.partition=true;\

    set hive.exec.dynamic.partition.mode=nonstrict;\

    set hive.exec.max.dynamic.partitions=5000; \

    from (select user_id,domain_first_lvl, \

    substr(p_hour,9,2) as hour_list,substr(p_hour,0,8) day_id \

    ,count(1) host_pv from "+source_i5_h+" where substr(p_hour,0,8)='"+day_id+"'\

    group by user_id,domain_first_lvl,substr(p_hour,9,2) ,substr(p_hour,0,8))t \

    insert overwrite table "+des_i5_h_host_count_hour+" partition(hour_list,day_id )  \

    select user_id ,sum(host_pv) host_pv,count(1) host_uv,hour_list,day_id  group by user_id,hour_list,day_id \

    ;"

    print hiveCmd+LoadDataSql+'"'

    os.system(hiveCmd+LoadDataSql+'"') 

       

#程序开始执行


try:  

    #DropLogTable()

    CreateLogTable()

    LoadDataToTable(day_id)

    

except Exception,e:

    print e



2、第一次改进后代码:(本地测试没问题,但是到客户方后,发现会试图读所有数据库,导致报错)



#!/usr/bin/env python

# -*-coding:utf-8 -*-

#insert_tables_spark.py

import os,sys

import datetime

import time

name = sys.argv[0][sys.argv[0].rfind(os.sep)+1:].rstrip('.py')

hour_id = sys.argv[1] 

SOURCE_HIVE_DATABASE = sys.argv[2]

TARGET_HIVE_DATABASE = sys.argv[3] 


#设置PYTHONPATH

#current_path=os.path.dirname(os.path.abspath(__file__))


hiveCmd='spark-sql \

        --master yarn \

        --deploy-mode client \

        --driver-memory 4G \

        --executor-memory 4G \

        --num-executors 10 \

        -e "'+'use '+ TARGET_HIVE_DATABASE + ';'

#source_table 

source_i5_h=SOURCE_HIVE_DATABASE+".i5_h" 

#des_table

des_i5_h_host_count_hour_hour="i5_h_host_times_hour"

###############################################################################

#创建表#

#i5_h_host_count

############################################################################## 

def CreateLogHourTable():

    Createtables_des_i5_h_host_count_hour="create table if not exists "+des_i5_h_host_count_hour_hour+" (user_id string,domain_first_lvl string,host_pv bigint) PARTITIONED BY (day_id string, hour_id string )row format delimited fields terminated by '\\t' LOCATION 'hdfs://cluster/xxxx/xxx_hive_db/"+des_i5_h_host_count_hour_hour+"';"

    os.system(hiveCmd+Createtables_des_i5_h_host_count_hour+';"') 


###############################################################################

# 数据添加到分区表

############################################################################## 


def LoadHourDataToTable(hour_id):     

    

    LoadHourDataSql=" set hive.exec.dynamic.partition=true;\

    set hive.exec.dynamic.partition.mode=nonstrict;\

    set hive.exec.max.dynamic.partitions=5000; \

    insert overwrite table "+des_i5_h_host_count_hour_hour+" partition(day_id,hour_id )  \

    select user_id,domain_first_lvl,count(1) host_pv ,substr(p_hour,0,8) day_id,p_hour as hour_id\

    from "+source_i5_h+" where  substr(p_hour,0,10)='"+hour_id+"'\

    group by user_id,domain_first_lvl,p_hour,substr(p_hour,0,8);"

    print hiveCmd+LoadHourDataSql+'"'

    os.system(hiveCmd+LoadHourDataSql+'"') 

try:  

    CreateLogHourTable()

    LoadHourDataToTable(hour_id)

except Exception,e:

    print e

    

3、第二次改进后代码:


#!/usr/bin/env python

# -*-coding:utf-8 -*-

#insert_tables_spark.py

#*******************************************************************************************

# ** 文件名称:insert_tables_spark.py

# ** 功能描述:

# ** 

# ** 特殊说明:

# ** 输入表: source_h

# ** 输出表:source_h_host_times_hour

# ** 调用格式:  

# **            按需:spark-submit --master yarn --deploy-mode client  --driver-memory 2G --executor-memory 2G --num-executors 100 insert_tables_spark.py  hour_id SOURCE_HIVE_DATABASE TARGET_HIVE_DATABASE

# ** 输出表: 

# ** 

# ** 创建者: yangmh

from pyspark.sql import SparkSession, SQLContext


import os

import sys

reload(sys)

sys.setdefaultencoding('utf-8')


spark = SparkSession.builder.enableHiveSupport().getOrCreate()

name = sys.argv[0][sys.argv[0].rfind(os.sep)+1:].rstrip('.py')

hour_id = sys.argv[1] 

SOURCE_HIVE_DATABASE = sys.argv[2]

TARGET_HIVE_DATABASE = sys.argv[3] 

#source_table 

source_i5_h=SOURCE_HIVE_DATABASE+".source_h" 

#des_table

des_i5_h_host_count_hour="source_h_host_times_hour"

spark.sql("set hive.exec.dynamic.partition=true")

spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")

spark.sql("set hive.exec.max.dynamic.partitions=5000")

spark.sql("use "+TARGET_HIVE_DATABASE)

create_table_sql="create table if not exists "+des_i5_h_host_count_hour+" (user_id string,domain_first_lvl string,host_pv bigint) PARTITIONED BY (day_id string, hour_id string )row format delimited fields terminated by '\\t' LOCATION 'hdfs://cluster/xxxx/xxx_hive_db/"+des_i5_h_host_count_hour+"'"

print create_table_sql

LoadHourDataSql=" insert overwrite table "+des_i5_h_host_count_hour+" partition(day_id,hour_id )  \

    select user_id,domain_first_lvl,count(1) host_pv ,substr(p_hour,0,8) day_id,p_hour as hour_id\

    from "+source_i5_h+" where  substr(p_hour,0,10)='"+hour_id+"'\

    group by user_id,domain_first_lvl,p_hour,substr(p_hour,0,8)"

print LoadHourDataSql

spark.sql(create_table_sql)

spark.sql(LoadHourDataSql)


以上是关于HIVE_SQL转SparkSQL 简单示例的主要内容,如果未能解决你的问题,请参考以下文章

Hive_sql50道练习题

SparkSQL程序设计

sparksql中grouping sets的使用方法

oracleSQL 转 SPARKSQL(hiveSql) 及常用优化

oracleSQL 转 SPARKSQL(hiveSql) 及常用优化

SparkSQL 之旅