Hive 和 Spark SQL

Posted 小数据自留地

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hive 和 Spark SQL相关的知识,希望对你有一定的参考价值。

这几天看到了一篇滴滴关于Hive SQL迁移到Spark SQL的技术文章 现在把相关知识进行一下整理


Hive On Spark和Spark SQL的共同点


首先Hive On Spark和Spark SQL都是一个翻译层,把一个 SQL翻译成分布式可执行的Spark程序, Hive和SparkSQL都不负责计算,它们只是告诉Spark,你需要这样算那样算,但是本身并不直接参与计算。


在CDH平台上, Hive On Spark 可以通过以下二个参数进行配置:

Spark On YARN Service:Name of the Spark on YARN service that this Hive service instance depends on. If selected, Hive jobs can use the Spark execution engine instead of MapReduce2. hive.execution.engine:Default Execution Engine, 这里可以配置Spark或者MapReduce

这二个参数都选择spark的话,那么Hive的默认执行引擎就会使用Spark,如果因为特殊原因需要使用mapreduce的话,可以在直接在HiveSQl 设置参数

set hive.execution.engine=mr;


Hive On Spark和Spark SQL的差异


在滴滴的那篇文章中性能&功能差异的这一段提到了:

Hive SQL可以通过设置以下配置合并小文件,MR Job结束后,判断生成文件的平均大小,如果小于阀值,就再启动一个Job来合并文件,目前Spark SQL不支持小文件合并。

滴滴大数据,公众号:数据管道

在CDH中可以在HIVE 里配置:

hive.merge.mapfilesMerge small files at the end of a map-only job. When enabled, a map-only job is created to merge the files in the destination table/partitions.hive.merge.mapredfilesMerge small files at the end of a map-reduce jobWhen enableda map-only job is created to merge the files in the destination table/partitions.hive.merge.sparkfiles: Merge small files at the end of a Spark jobWhen enableda map-only job is created to merge the files in the destination table/partitions.

这里需要注意的是hive.merge.sparkfiles,这里是Hive里面的设置,也就是说使用Hive on spark,小文件还是会被自动进行合并的, 但是我们直接写pyspark, SparkSQL这样是不会被合并的。滴滴是参考Hive SQL的实现在Spark SQL中引入了小文件合并功能。我们在生产中使用Spark streaming同步应用日志也遇到了同样的问题,我们采取的是用spark先写入一个临时分区外表的HDFS目录中, 然后通过定时任务的方式Load数据到临时表,再调用Hive的Insert overwrite这个临时表中的数据到新的表中,这样小文件就会被Hive任务合并了,最后还需要清理掉分区和分区临时表中的数据。大致的逻辑如下:

#载入Spark写入HDFS的数据到临时表中sql="load data inpath '$TARGET_DIR' OVERWRITE into table $HIVE_ORG_TABLE partition(dt='$DB_DATE'hour='$DB_HOUR');"  hive -e "$sql"
#INSERT OVERWRITE的临时表数据到新表中,并进行文件合并sql="INSERT OVERWRITE TABLE $HIVE_DEST_TABLE partition(dt='$DB_DATE'hour='$DB_HOUR')SELECT logging_timestamp,tags,source,messageFROM $HIVE_ORG_TABLEWHERE dt='$DB_DATE'AND hour='$DB_HOUR';"hive -e "$sql"
#删除临时表的分区sql="ALTER TABLE $HIVE_ORG_TABLE DROP partition(dt='$DB_DATE', hour='$DB_HOUR');"hive -e "$sql"


滴滴的文章中还提到了一部分Hive SQL和Spark SQL在语法上的差异,这也是我们注意到过的问题,比如几个比较典型的:

    •  支持CREATE TEMPORARY TABLE

    • 各类Hive UDF的支持调用,主要包括get_json_object,datediff,unix_timestamp,to_date,collect_set,date_sub  [SPARK-33721]

    • DROP不存在的表和分区,Spark SQL报错,Hive SQL 正常  [SPARK-33637]

    • 删除分区时支持设置过滤条件 [SPARK-33691]

    滴滴大数据,公众号:数据管道

    体来说,大多数情况下,二者基本可以通用,但特殊情况下仍然需要对二者支持的功能进行取舍。这里举一个我自身遇到的情况, 在处理比较复杂的日志数据的时候,Hive或者Spark没有自带的函数能够处理, 这时候 我们需要引入UDF, 在Hive中引入UDF需要单独开发Java打成jar包,我自己主要以pyhon开发为主,在pyspark直接开发udf会简单很多:

    #Pyspark 2.2.0-cdh6.0.1
    from pyspark.sql.functions import udf
    def url_reducer(url):    return "http://www.pyspark.test"
    #Converting function to UDFfunc_url = udf(url_reducer, StringType())
    # UDFSaprkSQL中的调用 sqlDF.withColumn('new_column',func_url(sqlDF['url']))


    以上是关于Hive 和 Spark SQL的主要内容,如果未能解决你的问题,请参考以下文章

    使用Spark创建HIVE-SQL练习环境原创首发

    Hive和Spark SQL优化

    Hive和Spark SQL优化

    spark SQL和hive到底啥关系

    Spark-Sql整合hive,在spark-sql命令和spark-shell命令下执行sql命令和整合调用hive

    0643-Spark SQL Thrift简介