Hive 不读取 Spark 生成的分区 parquet 文件

Posted

技术标签:

【中文标题】Hive 不读取 Spark 生成的分区 parquet 文件【英文标题】:Hive doesn't read partitioned parquet files generated by Spark 【发布时间】:2016-02-06 17:16:26 【问题描述】:

我在读取 Hive 中的 Spark 生成的分区拼花文件时遇到问题。我可以在 hive 中创建外部表,但是当我尝试选择几行时,hive 只返回没有行的“OK”消息。

我能够在 Spark 中正确读取分区 parquet 文件,因此我假设它们是正确生成的。 当我在 hive 中创建一个没有分区的外部表时,我也能够读取这些文件。

有人有什么建议吗?

我的环境是:

集群 EMR 4.1.0 Hive 1.0.0 火花 1.5.0 色相 3.7.1 Parquet 文件存储在 S3 存储桶中 (s3://staging-dev/test/ttfourfieldspart2/year=2013/month=11)

我的 Spark 配置文件有以下参数(/etc/spark/conf.dist/spark-defaults.conf):

spark.master yarn
spark.driver.extraClassPath /etc/hadoop/conf:/etc/hive/conf:/usr/lib/hadoop/*:/usr/lib/hadoop-hdfs/*:/usr/lib/hadoop-mapreduce/*:/usr/lib/hadoop-yarn/*:/usr/lib/hadoop-lzo/lib/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*
spark.driver.extraLibraryPath /usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native
spark.executor.extraClassPath /etc/hadoop/conf:/etc/hive/conf:/usr/lib/hadoop/*:/usr/lib/hadoop-hdfs/*:/usr/lib/hadoop-mapreduce/*:/usr/lib/hadoop-yarn/*:/usr/lib/hadoop-lzo/lib/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*
spark.executor.extraLibraryPath /usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native
spark.eventLog.enabled true
spark.eventLog.dir hdfs:///var/log/spark/apps
spark.history.fs.logDirectory hdfs:///var/log/spark/apps
spark.yarn.historyServer.address ip-10-37-161-246.ec2.internal:18080
spark.history.ui.port 18080
spark.shuffle.service.enabled true
spark.driver.extraJavaOptions    -Dlog4j.configuration=file:///etc/spark/conf/log4j.properties -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 -XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=512M -XX:OnOutOfMemoryError='kill -9 %p'
spark.executor.extraJavaOptions  -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p'
spark.executor.memory 4G
spark.driver.memory 4G
spark.dynamicAllocation.enabled true
spark.dynamicAllocation.maxExecutors 100
spark.dynamicAllocation.minExecutors 1

Hive 配置文件有以下参数(/etc/hive/conf/hive-site.xml):

<configuration>

<!-- Hive Configuration can either be stored in this file or in the hadoop configuration files  -->
<!-- that are implied by Hadoop setup variables.                                                -->
<!-- Aside from Hadoop setup variables - this file is provided as a convenience so that Hive    -->
<!-- users do not have to edit hadoop configuration files (that may be managed as a centralized -->
<!-- resource).                                                                                 -->

<!-- Hive Execution Parameters -->


<property>
  <name>hbase.zookeeper.quorum</name>
  <value>ip-10-xx-xxx-xxx.ec2.internal</value>
  <description>http://wiki.apache.org/hadoop/Hive/HBaseIntegration</description>
</property>

<property>
  <name>hive.execution.engine</name>
  <value>mr</value>
</property>

  <property>
    <name>fs.defaultFS</name>
    <value>hdfs://ip-10-xx-xxx-xxx.ec2.internal:8020</value>
  </property>

<property>
  <name>hive.metastore.uris</name>
  <value>thrift://ip-10-xx-xxx-xxx.ec2.internal:9083</value>
  <description>JDBC connect string for a JDBC metastore</description>
</property>

<property>
    <name>javax.jdo.option.ConnectionURL</name>
    <value>jdbc:mysql://ip-10-xx-xxx-xxx.ec2.internal:3306/hive?createDatabaseIfNotExist=true</value>
    <description>username to use against metastore database</description>
</property>

<property>
    <name>javax.jdo.option.ConnectionDriverName</name>
    <value>org.mariadb.jdbc.Driver</value>
    <description>username to use against metastore database</description>
</property>

<property>
  <name>javax.jdo.option.ConnectionUserName</name>
  <value>hive</value>
  <description>username to use against metastore database</description>
</property>

<property>
  <name>javax.jdo.option.ConnectionPassword</name>
  <value>1R72JFCDG5XaaDTB</value>
  <description>password to use against metastore database</description>
</property>

  <property>
    <name>datanucleus.fixedDatastore</name>
    <value>true</value>
  </property>

  <property>
    <name>mapred.reduce.tasks</name>
    <value>-1</value>
  </property>

  <property>
    <name>mapred.max.split.size</name>
    <value>256000000</value>
  </property>

  <property>
    <name>hive.metastore.connect.retries</name>
    <value>5</value>
  </property>

  <property>
    <name>hive.optimize.sort.dynamic.partition</name>
    <value>true</value>
  </property>

  <property><name>hive.exec.dynamic.partition</name><value>true</value></property>
  <property><name>hive.exec.dynamic.partition.mode</name><value>nonstrict</value></property>
  <property><name>hive.exec.max.dynamic.partitions</name><value>10000</value></property>
  <property><name>hive.exec.max.dynamic.partitions.pernode</name><value>500</value></property>

</configuration>

我读取分区拼花文件的python代码:

from pyspark import *
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

df7 = sqlContext.read.parquet('s3://staging-dev/test/ttfourfieldspart2/')

Spark 打印的 parquet 文件架构:

>>> df7.schema
StructType(List(StructField(transactionid,StringType,true),StructField(eventts,TimestampType,true),StructField(year,IntegerType,true),StructField(month,IntegerType,true)))

>>> df7.printSchema()
root
 |-- transactionid: string (nullable = true)
 |-- eventts: timestamp (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)

>>> df7.show(10)
+--------------------+--------------------+----+-----+
|       transactionid|             eventts|year|month|
+--------------------+--------------------+----+-----+
|f7018907-ed3d-49b...|2013-11-21 18:41:...|2013|   11|
|f6d95a5f-d4ba-489...|2013-11-21 18:41:...|2013|   11|
|02b2a715-6e15-4bb...|2013-11-21 18:41:...|2013|   11|
|0e908c0f-7d63-48c...|2013-11-21 18:41:...|2013|   11|
|f83e30f9-950a-4b9...|2013-11-21 18:41:...|2013|   11|
|3425e4ea-b715-476...|2013-11-21 18:41:...|2013|   11|
|a20a6aeb-da4f-4fd...|2013-11-21 18:41:...|2013|   11|
|d2f57e6f-889b-49b...|2013-11-21 18:41:...|2013|   11|
|46f2eda5-408e-44e...|2013-11-21 18:41:...|2013|   11|
|36fb8b79-b2b5-493...|2013-11-21 18:41:...|2013|   11|
+--------------------+--------------------+----+-----+
only showing top 10 rows

Hive 中的创建表:

create external table if not exists t3(
  transactionid string,
  eventts timestamp)
partitioned by (year int, month int)
stored as parquet
location 's3://staging-dev/test/ttfourfieldspart2/';

当我尝试在 Hive 中选择一些行时,它不会返回任何行:

hive> select * from t3 limit 10;
OK
Time taken: 0.027 seconds
hive> 

【问题讨论】:

【参考方案1】:

我终于找到了问题所在。当您在 Hive 中创建表时,分区数据已存在于 S3 或 HDFS 中,您需要运行命令以使用表的分区结构更新 Hive Metastore。看看这里: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-RecoverPartitions(MSCKREPAIRTABLE)

The commands are:

MSCK REPAIR TABLE table_name;


And on Hive running in Amazon EMR you can use:

ALTER TABLE table_name RECOVER PARTITIONS;

【讨论】:

这对我也有用。全新的表,在 select 将返回任何数据之前,必须修复它... K... 谢谢!【参考方案2】:

即使这个问题已经回答了,以下几点也可以帮助那些仅仅通过MSCK REPAIR TABLE table_name;还不能解决问题的用户

我有一个 hdfs 文件系统,分区如下:

&lt;parquet_file&gt;/&lt;partition1&gt;/&lt;partition2&gt;

例如:my_file.pq/column_5=test/column_6=5

我创建了一个带分区的配置单元表

例如:

CREATE EXTERNAL TABLE myschema.my_table(
`column_1` int,
`column_2` string,
`column_3` string,
`column_4` string
)
PARTITIONED BY (`column_5` string, `column_6` int) STORED AS PARQUET
LOCATION
  'hdfs://u/users/iamr/my_file.pq'

在此之后,我使用以下命令修复了架构分区

MSCK REPAIR TABLE myschema.my_table;

此后它开始为我工作。

我注意到的另一件事是,在从 spark 编写 PARQUET 文件时,将列命名为小写,否则 hive 可能无法映射它。对我来说,重命名 PARQUET 文件中的列后,它开始工作了

例如:my_file.pq/COLUMN_5=test/COLUMN_6=5 对我不起作用

但是my_file.pq/column_5=test/column_6=5 工作了

【讨论】:

以上是关于Hive 不读取 Spark 生成的分区 parquet 文件的主要内容,如果未能解决你的问题,请参考以下文章

Spark读取HDFS数据分区参考

源码级解读如何解决Spark-sql读取hive分区表执行效率低问题

使用 spark hivecontext 读取外部 hive 分区表的问题

Spark Sql 从 Hive orc 分区表中读取,给出数组越界异常

spark读取数据写入hive数据表

从 Spark 替换 hive 分区