将 Spark 数据帧保存为 Hive 中的动态分区表
Posted
技术标签:
【中文标题】将 Spark 数据帧保存为 Hive 中的动态分区表【英文标题】:Save Spark dataframe as dynamic partitioned table in Hive 【发布时间】:2015-09-29 05:54:08 【问题描述】:我有一个示例应用程序可以将 csv 文件读取到数据帧中。可以使用以下方法将数据帧以 parquet 格式存储到 Hive 表中
df.saveAsTable(tablename,mode)
。
上面的代码工作正常,但是我每天都有这么多数据,我想根据创建日期(表中的列)对配置单元表进行动态分区。
有没有办法动态划分数据帧并将其存储到配置单元仓库。希望避免使用 hivesqlcontext.sql(insert into table partittioin by(date)....)
对插入语句进行硬编码。
问题可以看作是:How to save DataFrame directly to Hive?的扩展
非常感谢任何帮助。
【问题讨论】:
【参考方案1】:我相信它的工作原理是这样的:
df
是一个包含年、月和其他列的数据框
df.write.partitionBy('year', 'month').saveAsTable(...)
或
df.write.partitionBy('year', 'month').insertInto(...)
【讨论】:
试过这个 Partitionby 方法。它仅适用于 RDD 级别,一旦创建数据框,大多数方法都是 DBMS 样式的,例如groupby,orderby,但它们不能用于在 Hive 上的不同分区文件夹中写入。 好的,所以可以用 1.4 版本解决。 df.write().mode(SaveMode.Append).partitionBy("date").saveAsTable("Tablename"); .但是,这会将我的日期字段更改为整数值并删除实际日期。例如该列中有 9 个唯一日期,但它们现在存储为 1,2,3....,文件夹名称为 date=1,2,3,... 而不是 date=20141121。让我知道是否有办法做到这一点。 @subramaniam-ramasubramanian:请回复 OP 的问题作为答案,而不是编辑现有答案 这是否适用于覆盖多个动态分区而不会丢失基本目录中的其他分区 这个答案已有五年之久 - 很高兴看到它更新为任何新的语法火花。【参考方案2】:我能够使用 df.write().mode(SaveMode.Append).partitionBy("colname").saveAsTable("Table")
写入分区的配置单元表
我必须启用以下属性才能使其工作。
hiveContext.setConf("hive.exec.dynamic.partition", "true") hiveContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")【讨论】:
我应该在哪里设置以上2个参数?我尝试登录 hive shell 并运行上面的命令,但失败了。我确定我做错了。你能告诉我在哪里可以设置这些属性吗? @VrushankDoshi 您可以在创建 hiveContext 后立即在 spark 程序中设置它。 val sparkConf = new SparkConf() val sc = new SparkContext(sparkConf) val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) hiveContext.setConf("hive.exec.dynamic.partition","true" ) hiveContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict") 从我这边来看,这段代码会覆盖但不附加任何数据。为什么? 它会报错:在基于RDD的表中需要使用insertInto,insertinto需要hive中已经存在的表。【参考方案3】:我也遇到过同样的事情,但我使用了以下技巧。
当我们对任何表进行分区时,分区列就会区分大小写。
分区列应存在于具有相同名称的 DataFrame 中(区分大小写)。代码:
var dbName="your database name"
var finaltable="your table name"
// First check if table is available or not..
if (sparkSession.sql("show tables in " + dbName).filter("tableName='" +finaltable + "'").collect().length == 0)
//If table is not available then it will create for you..
println("Table Not Present \n Creating table " + finaltable)
sparkSession.sql("use Database_Name")
sparkSession.sql("SET hive.exec.dynamic.partition = true")
sparkSession.sql("SET hive.exec.dynamic.partition.mode = nonstrict ")
sparkSession.sql("SET hive.exec.max.dynamic.partitions.pernode = 400")
sparkSession.sql("create table " + dbName +"." + finaltable + "(EMP_ID string,EMP_Name string,EMP_Address string,EMP_Salary bigint) PARTITIONED BY (EMP_DEP STRING)")
//Table is created now insert the DataFrame in append Mode
df.write.mode(SaveMode.Append).insertInto(empDB + "." + finaltable)
【讨论】:
df.write.mode(SaveMode.Append).insertInto(empDB + "." + finaltable) 你不需要提到 partitionBy 吗?例如 df.write.mode(SaveMode.Append).partitionBy("EMP_DEP" ).insertInto(empDB + "." + finaltable) 不需要.. 可选 我的表是 hive 中的现有表【参考方案4】:可以在SparkSession
上这样配置:
spark = SparkSession \
.builder \
...
.config("spark.hadoop.hive.exec.dynamic.partition", "true") \
.config("spark.hadoop.hive.exec.dynamic.partition.mode", "nonstrict") \
.enableHiveSupport() \
.getOrCreate()
或者您可以将它们添加到 .properties 文件中
Spark 配置需要 spark.hadoop
前缀(至少在 2.4 中),这是 Spark 设置此配置的方式:
/**
* Appends spark.hadoop.* configurations from a [[SparkConf]] to a Hadoop
* configuration without the spark.hadoop. prefix.
*/
def appendSparkHadoopConfigs(conf: SparkConf, hadoopConf: Configuration): Unit =
SparkHadoopUtil.appendSparkHadoopConfigs(conf, hadoopConf)
【讨论】:
【参考方案5】:这对我有用。我设置了这些设置,然后将数据放入分区表中。
from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)
sqlContext.setConf("hive.exec.dynamic.partition", "true")
sqlContext.setConf("hive.exec.dynamic.partition.mode",
"nonstrict")
【讨论】:
【参考方案6】:这对我使用 python 和 spark 2.1.0 有效。
不确定这是否是最好的方法,但它有效......
# WRITE DATA INTO A HIVE TABLE
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.master("local[*]") \
.config("hive.exec.dynamic.partition", "true") \
.config("hive.exec.dynamic.partition.mode", "nonstrict") \
.enableHiveSupport() \
.getOrCreate()
### CREATE HIVE TABLE (with one row)
spark.sql("""
CREATE TABLE IF NOT EXISTS hive_df (col1 INT, col2 STRING, partition_bin INT)
USING HIVE OPTIONS(fileFormat 'PARQUET')
PARTITIONED BY (partition_bin)
LOCATION 'hive_df'
""")
spark.sql("""
INSERT INTO hive_df PARTITION (partition_bin = 0)
VALUES (0, 'init_record')
""")
###
### CREATE NON HIVE TABLE (with one row)
spark.sql("""
CREATE TABLE IF NOT EXISTS non_hive_df (col1 INT, col2 STRING, partition_bin INT)
USING PARQUET
PARTITIONED BY (partition_bin)
LOCATION 'non_hive_df'
""")
spark.sql("""
INSERT INTO non_hive_df PARTITION (partition_bin = 0)
VALUES (0, 'init_record')
""")
###
### ATTEMPT DYNAMIC OVERWRITE WITH EACH TABLE
spark.sql("""
INSERT OVERWRITE TABLE hive_df PARTITION (partition_bin)
VALUES (0, 'new_record', 1)
""")
spark.sql("""
INSERT OVERWRITE TABLE non_hive_df PARTITION (partition_bin)
VALUES (0, 'new_record', 1)
""")
spark.sql("SELECT * FROM hive_df").show() # 2 row dynamic overwrite
spark.sql("SELECT * FROM non_hive_df").show() # 1 row full table overwrite
【讨论】:
【参考方案7】:df1.write.mode("append").format('ORC').partitionBy("date").option('path', '/hdfs_path').saveAsTable("DB.Partition_tablename")
它将使用“日期”列值创建分区,并且还将从 spark DF 写入 hive 中的 Hive 外部表。
【讨论】:
以上是关于将 Spark 数据帧保存为 Hive 中的动态分区表的主要内容,如果未能解决你的问题,请参考以下文章
在 Apache Spark 中,用 Java 将数据帧写入 Hive 表
使用 phoenix 连接器将 Spark 数据帧写入 Hbase