获取 Spark 写入 Hive 元存储的所有新分区
Posted
技术标签:
【中文标题】获取 Spark 写入 Hive 元存储的所有新分区【英文标题】:Get all the new partitions that are written to Hive metastore by Spark 【发布时间】:2019-07-25 13:19:08 【问题描述】:我有一个数据框,我使用 spark sql(使用动态分区)将其插入到现有的分区配置单元表中。 写入数据框后,我想知道我的数据框刚刚在 hive 中创建的分区是什么。
我可以查询不同分区的数据帧,但它需要很长时间,因为它必须启动数据帧的整个沿袭。
我可以在写入 hive 之前保留数据帧,这样,写入操作和不同的 partition_column 操作就发生在缓存的数据帧之上。但是我的数据框非常大,不想花更多时间在持久化上。
我知道所有的分区信息都存储在 Hive Metastore 中。 spark中是否有任何metastore api可以帮助仅检索已创建的新分区?
【问题讨论】:
你对哪一列的数据进行了分区?检查下面..可能会帮助***.com/questions/36095790/… 数据框中的日期列之一。 【参考方案1】:您可以使用HiveMetastoreClient 检索表的分区数据:
import org.apache.hadoop.hive.conf.HiveConf
import scala.collection.JavaConverters._
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient
val hiveConf = new HiveConf(spark.sparkContext.hadoopConfiguration, classOf[HiveConf])
val cli = new HiveMetaStoreClient(hiveConf)
/* Get list of partition values prior to DF insert */
val existingPartitions = cli.listPartitions("<db_name>", "<tbl_name>", Short.MaxValue).asScala.map(_.getValues.asScala.mkString(","))
/* Insert DF contents to table */
df.write.insertInto("<db_name>.<tbl_name>")
/* Fetch list of partition values again, and diff with previous list */
val newPartitions = cli.listPartitions("<db_name>", "<tbl_name>", Short.MaxValue).asScala.map(_.getValues.asScala.mkString(","))
val deltaPartitions = newPartitions.diff(existingPartitions)
【讨论】:
感谢您的回答。这适用于每次我只将新分区写入表的情况。在某些情况下,我的数据框可能会覆盖现有分区。但我想获取数据框写入的所有分区。是否有更多信息,例如写入这些分区的时间,以便我可以获取在特定时间之后写入的所有分区? 是的,listPartitions
返回的 Partition
对象有一个 getCreateTime
访问器,例如cli.listPartitions("<db_name>", "<tbl_name>", Short.MaxValue).asScala.map(part => (part.getValues.asScala.mkString(","), part.getCreateTime))
【参考方案2】:
val epochTime = <epoch time before inserting the dataframe>
val partitionName = <Partition Column Name>
df.write.insertInto("<db_name>.<tbl_name>")
val catalogPartitions = spark.sharedState.externalCatalog.listPartitions("<db_name>", "<tbl_name>")
val partitionValues = catalogPartitions.filter(cp => ((cp.parameters.get("transient_lastDdlTime").isDefined && cp.parameters.getOrElse("transient_lastDdlTime", "0").toLong >= epochTime / 1000) || cp.lastAccessTime >= epochTime || cp.createTime >= epochTime) && cp.spec.contains(datePartition)).map(cp => cp.spec.getOrElse(datePartition, "")
).toList
processedPartitions = partitionValues.toList
在大多数情况下,lastAccessTime 为 0。 createTime 具有创建分区的时间。但是在参数中,我发现了一个新的参数 transient_lastDdlTime ,其中包含分区的更新时间戳。在更安全的方面,检查所有三个以提供在给定纪元时间之后创建或修改的分区。
【讨论】:
以上是关于获取 Spark 写入 Hive 元存储的所有新分区的主要内容,如果未能解决你的问题,请参考以下文章
如何将 Spark-Notebook 连接到 Hive 元存储?
通过 Hive 元存储的 Spark SQL 查询“SHOW VIEWS IN”失败,“IN”处缺少“FUNCTIONS”
黑猴子的家:Spark on hive 与 hive on spark 的区别
使用Spark 编码 写入 hive 的过程中 hive字段乱码 [解决方案]