Hive 和 Spark SQL
Posted 小数据自留地
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hive 和 Spark SQL相关的知识,希望对你有一定的参考价值。
这几天看到了一篇滴滴关于Hive SQL迁移到Spark SQL的技术文章 现在把相关知识进行一下整理
Hive On Spark和Spark SQL的共同点
首先Hive On Spark和Spark SQL都是一个翻译层,把一个 SQL翻译成分布式可执行的Spark程序, Hive和SparkSQL都不负责计算,它们只是告诉Spark,你需要这样算那样算,但是本身并不直接参与计算。
在CDH平台上, Hive On Spark 可以通过以下二个参数进行配置:
Spark On YARN Service:Name of the Spark on YARN service that this Hive service instance depends on. If selected, Hive jobs can use the Spark execution engine instead of MapReduce2.
hive.execution.engine:Default Execution Engine, 这里可以配置Spark或者
MapReduce
这二个参数都选择spark的话,那么Hive的默认执行引擎就会使用Spark,如果因为特殊原因需要使用mapreduce的话,可以在直接在HiveSQl 设置参数
set hive.execution.engine=mr;
Hive On Spark和Spark SQL的差异
在滴滴的那篇文章中性能&功能差异的这一段提到了:
Hive SQL可以通过设置以下配置合并小文件,MR Job结束后,判断生成文件的平均大小,如果小于阀值,就再启动一个Job来合并文件,目前Spark SQL不支持小文件合并。
滴滴大数据,公众号:数据管道
在CDH中可以在HIVE 里配置:
hive.merge.mapfiles:Merge small files at the end of a map-only job. When enabled, a map-only job is created to merge the files in the destination table/partitions.
hive.merge.mapredfiles:Merge small files at the end of a map-reduce job. When enabled, a map-only job is created to merge the files in the destination table/partitions.
hive.merge.sparkfiles: Merge small files at the end of a Spark job. When enabled, a map-only job is created to merge the files in the destination table/partitions.
这里需要注意的是hive.merge.sparkfiles,这里是Hive里面的设置,也就是说使用Hive on spark,小文件还是会被自动进行合并的, 但是我们直接写pyspark, SparkSQL这样是不会被合并的。滴滴是参考Hive SQL的实现在Spark SQL中引入了小文件合并功能。我们在生产中使用Spark streaming同步应用日志也遇到了同样的问题,我们采取的是用spark先写入一个临时分区外表的HDFS目录中, 然后通过定时任务的方式Load数据到临时表,再调用Hive的Insert overwrite这个临时表中的数据到新的表中,这样小文件就会被Hive任务合并了,最后还需要清理掉分区和分区临时表中的数据。大致的逻辑如下:
#载入Spark写入HDFS的数据到临时表中
sql="load data inpath '$TARGET_DIR' OVERWRITE into table $HIVE_ORG_TABLE partition(dt='$DB_DATE', hour='$DB_HOUR');"
hive -e "$sql"
#INSERT OVERWRITE的临时表数据到新表中,并进行文件合并
sql="
INSERT OVERWRITE TABLE $HIVE_DEST_TABLE partition(dt='$DB_DATE', hour='$DB_HOUR')
SELECT logging_timestamp,
tags,
source,
message
FROM $HIVE_ORG_TABLE
WHERE dt='$DB_DATE'
AND hour='$DB_HOUR'
;
"
hive -e "$sql"
#删除临时表的分区
sql="
ALTER TABLE $HIVE_ORG_TABLE DROP partition(dt='$DB_DATE', hour='$DB_HOUR');
"
hive -e "$sql"
滴滴的文章中还提到了一部分Hive SQL和Spark SQL在语法上的差异,这也是我们注意到过的问题,比如几个比较典型的:
支持CREATE TEMPORARY TABLE
各类Hive UDF的支持调用,主要包括get_json_object,datediff,unix_timestamp,to_date,collect_set,date_sub [SPARK-33721]
DROP不存在的表和分区,Spark SQL报错,Hive SQL 正常 [SPARK-33637]
删除分区时支持设置过滤条件 [SPARK-33691]
滴滴大数据,公众号:数据管道
整体来说,大多数情况下,二者基本可以通用,但特殊情况下仍然需要对二者支持的功能进行取舍。这里举一个我自身遇到的情况, 在处理比较复杂的日志数据的时候,Hive或者Spark没有自带的函数能够处理, 这时候 我们需要引入UDF, 在Hive中引入UDF需要单独开发Java打成jar包,我自己主要以pyhon开发为主,在pyspark直接开发udf会简单很多:
#Pyspark 2.2.0-cdh6.0.1
from pyspark.sql.functions import udf
def url_reducer(url):
return "http://www.pyspark.test"
#Converting function to UDF
func_url = udf(url_reducer, StringType())
# UDF 在SaprkSQL中的调用
sqlDF.withColumn('new_column',func_url(sqlDF['url']))
以上是关于Hive 和 Spark SQL的主要内容,如果未能解决你的问题,请参考以下文章