Hive 不读取 Spark 生成的分区 parquet 文件
Posted
技术标签:
【中文标题】Hive 不读取 Spark 生成的分区 parquet 文件【英文标题】:Hive doesn't read partitioned parquet files generated by Spark 【发布时间】:2016-02-06 17:16:26 【问题描述】:我在读取 Hive 中的 Spark 生成的分区拼花文件时遇到问题。我可以在 hive 中创建外部表,但是当我尝试选择几行时,hive 只返回没有行的“OK”消息。
我能够在 Spark 中正确读取分区 parquet 文件,因此我假设它们是正确生成的。 当我在 hive 中创建一个没有分区的外部表时,我也能够读取这些文件。
有人有什么建议吗?
我的环境是:
集群 EMR 4.1.0 Hive 1.0.0 火花 1.5.0 色相 3.7.1 Parquet 文件存储在 S3 存储桶中 (s3://staging-dev/test/ttfourfieldspart2/year=2013/month=11)我的 Spark 配置文件有以下参数(/etc/spark/conf.dist/spark-defaults.conf):
spark.master yarn
spark.driver.extraClassPath /etc/hadoop/conf:/etc/hive/conf:/usr/lib/hadoop/*:/usr/lib/hadoop-hdfs/*:/usr/lib/hadoop-mapreduce/*:/usr/lib/hadoop-yarn/*:/usr/lib/hadoop-lzo/lib/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*
spark.driver.extraLibraryPath /usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native
spark.executor.extraClassPath /etc/hadoop/conf:/etc/hive/conf:/usr/lib/hadoop/*:/usr/lib/hadoop-hdfs/*:/usr/lib/hadoop-mapreduce/*:/usr/lib/hadoop-yarn/*:/usr/lib/hadoop-lzo/lib/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*
spark.executor.extraLibraryPath /usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native
spark.eventLog.enabled true
spark.eventLog.dir hdfs:///var/log/spark/apps
spark.history.fs.logDirectory hdfs:///var/log/spark/apps
spark.yarn.historyServer.address ip-10-37-161-246.ec2.internal:18080
spark.history.ui.port 18080
spark.shuffle.service.enabled true
spark.driver.extraJavaOptions -Dlog4j.configuration=file:///etc/spark/conf/log4j.properties -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 -XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=512M -XX:OnOutOfMemoryError='kill -9 %p'
spark.executor.extraJavaOptions -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p'
spark.executor.memory 4G
spark.driver.memory 4G
spark.dynamicAllocation.enabled true
spark.dynamicAllocation.maxExecutors 100
spark.dynamicAllocation.minExecutors 1
Hive 配置文件有以下参数(/etc/hive/conf/hive-site.xml):
<configuration>
<!-- Hive Configuration can either be stored in this file or in the hadoop configuration files -->
<!-- that are implied by Hadoop setup variables. -->
<!-- Aside from Hadoop setup variables - this file is provided as a convenience so that Hive -->
<!-- users do not have to edit hadoop configuration files (that may be managed as a centralized -->
<!-- resource). -->
<!-- Hive Execution Parameters -->
<property>
<name>hbase.zookeeper.quorum</name>
<value>ip-10-xx-xxx-xxx.ec2.internal</value>
<description>http://wiki.apache.org/hadoop/Hive/HBaseIntegration</description>
</property>
<property>
<name>hive.execution.engine</name>
<value>mr</value>
</property>
<property>
<name>fs.defaultFS</name>
<value>hdfs://ip-10-xx-xxx-xxx.ec2.internal:8020</value>
</property>
<property>
<name>hive.metastore.uris</name>
<value>thrift://ip-10-xx-xxx-xxx.ec2.internal:9083</value>
<description>JDBC connect string for a JDBC metastore</description>
</property>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://ip-10-xx-xxx-xxx.ec2.internal:3306/hive?createDatabaseIfNotExist=true</value>
<description>username to use against metastore database</description>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>org.mariadb.jdbc.Driver</value>
<description>username to use against metastore database</description>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>hive</value>
<description>username to use against metastore database</description>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>1R72JFCDG5XaaDTB</value>
<description>password to use against metastore database</description>
</property>
<property>
<name>datanucleus.fixedDatastore</name>
<value>true</value>
</property>
<property>
<name>mapred.reduce.tasks</name>
<value>-1</value>
</property>
<property>
<name>mapred.max.split.size</name>
<value>256000000</value>
</property>
<property>
<name>hive.metastore.connect.retries</name>
<value>5</value>
</property>
<property>
<name>hive.optimize.sort.dynamic.partition</name>
<value>true</value>
</property>
<property><name>hive.exec.dynamic.partition</name><value>true</value></property>
<property><name>hive.exec.dynamic.partition.mode</name><value>nonstrict</value></property>
<property><name>hive.exec.max.dynamic.partitions</name><value>10000</value></property>
<property><name>hive.exec.max.dynamic.partitions.pernode</name><value>500</value></property>
</configuration>
我读取分区拼花文件的python代码:
from pyspark import *
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
df7 = sqlContext.read.parquet('s3://staging-dev/test/ttfourfieldspart2/')
Spark 打印的 parquet 文件架构:
>>> df7.schema
StructType(List(StructField(transactionid,StringType,true),StructField(eventts,TimestampType,true),StructField(year,IntegerType,true),StructField(month,IntegerType,true)))
>>> df7.printSchema()
root
|-- transactionid: string (nullable = true)
|-- eventts: timestamp (nullable = true)
|-- year: integer (nullable = true)
|-- month: integer (nullable = true)
>>> df7.show(10)
+--------------------+--------------------+----+-----+
| transactionid| eventts|year|month|
+--------------------+--------------------+----+-----+
|f7018907-ed3d-49b...|2013-11-21 18:41:...|2013| 11|
|f6d95a5f-d4ba-489...|2013-11-21 18:41:...|2013| 11|
|02b2a715-6e15-4bb...|2013-11-21 18:41:...|2013| 11|
|0e908c0f-7d63-48c...|2013-11-21 18:41:...|2013| 11|
|f83e30f9-950a-4b9...|2013-11-21 18:41:...|2013| 11|
|3425e4ea-b715-476...|2013-11-21 18:41:...|2013| 11|
|a20a6aeb-da4f-4fd...|2013-11-21 18:41:...|2013| 11|
|d2f57e6f-889b-49b...|2013-11-21 18:41:...|2013| 11|
|46f2eda5-408e-44e...|2013-11-21 18:41:...|2013| 11|
|36fb8b79-b2b5-493...|2013-11-21 18:41:...|2013| 11|
+--------------------+--------------------+----+-----+
only showing top 10 rows
Hive 中的创建表:
create external table if not exists t3(
transactionid string,
eventts timestamp)
partitioned by (year int, month int)
stored as parquet
location 's3://staging-dev/test/ttfourfieldspart2/';
当我尝试在 Hive 中选择一些行时,它不会返回任何行:
hive> select * from t3 limit 10;
OK
Time taken: 0.027 seconds
hive>
【问题讨论】:
【参考方案1】:我终于找到了问题所在。当您在 Hive 中创建表时,分区数据已存在于 S3 或 HDFS 中,您需要运行命令以使用表的分区结构更新 Hive Metastore。看看这里: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-RecoverPartitions(MSCKREPAIRTABLE)
The commands are:
MSCK REPAIR TABLE table_name;
And on Hive running in Amazon EMR you can use:
ALTER TABLE table_name RECOVER PARTITIONS;
【讨论】:
这对我也有用。全新的表,在 select 将返回任何数据之前,必须修复它... K... 谢谢!【参考方案2】:即使这个问题已经回答了,以下几点也可以帮助那些仅仅通过MSCK REPAIR TABLE table_name;
还不能解决问题的用户
我有一个 hdfs 文件系统,分区如下:
<parquet_file>/<partition1>/<partition2>
例如:my_file.pq/column_5=test/column_6=5
我创建了一个带分区的配置单元表
例如:
CREATE EXTERNAL TABLE myschema.my_table(
`column_1` int,
`column_2` string,
`column_3` string,
`column_4` string
)
PARTITIONED BY (`column_5` string, `column_6` int) STORED AS PARQUET
LOCATION
'hdfs://u/users/iamr/my_file.pq'
在此之后,我使用以下命令修复了架构分区
MSCK REPAIR TABLE myschema.my_table;
此后它开始为我工作。
我注意到的另一件事是,在从 spark 编写 PARQUET 文件时,将列命名为小写,否则 hive 可能无法映射它。对我来说,重命名 PARQUET 文件中的列后,它开始工作了
例如:my_file.pq/COLUMN_5=test/COLUMN_6=5
对我不起作用
但是my_file.pq/column_5=test/column_6=5
工作了
【讨论】:
以上是关于Hive 不读取 Spark 生成的分区 parquet 文件的主要内容,如果未能解决你的问题,请参考以下文章
源码级解读如何解决Spark-sql读取hive分区表执行效率低问题
使用 spark hivecontext 读取外部 hive 分区表的问题