在读取镶木地板文件时刷新 Dataframe 的元数据

Posted

技术标签:

【中文标题】在读取镶木地板文件时刷新 Dataframe 的元数据【英文标题】:Refresh metadata for Dataframe while reading parquet file 【发布时间】:2019-11-08 07:23:56 【问题描述】:

我正在尝试将 parquet 文件作为数据帧读取,该数据帧将定期更新(路径为 /folder_name。每当有新数据出现时,旧 parquet 文件路径(/folder_name)将重命名为临时路径,然后我们将新数据和旧数据合并,并将存储在旧路径中(/folder_name

假设我们有一个镶木地板文件,在更新之前为hdfs://folder_name/part-xxxx-xxx.snappy.parquet,然后在更新后它更改为hdfs://folder_name/part-00000-yyyy-yyy.snappy.parquet

问题是当我尝试在更新完成时读取镶木地板文件时

sparksession.read.parquet("filename") => 它采用旧路径hdfs://folder_name/part-xxxx-xxx.snappy.parquet(路径存在)

当在数据帧上调用操作时,它试图从 hdfs://folder_name/part-xxxx-xxx.snappy.parquet 读取数据,但由于更新,文件名发生了变化,我遇到了以下问题

java.io.FileNotFoundException:文件不存在:hdfs://folder_name/part-xxxx-xxx.snappy.parquet 基础文件可能已更新。您可以通过在 SQL 中运行“REFRESH TABLE tableName”命令或重新创建相关的 Dataset/DataFrame 来显式地使 Spark 中的缓存无效。

我使用的是 Spark 2.2

谁能帮助我如何刷新元数据?

【问题讨论】:

我在 ipython 内核(jupyter notebook)中得到了这个FileNotFoundException "...snappy.parquet"。通过重启内核修复。 【参考方案1】:

当您尝试读取不存在的文件时会发生该错误。

如果我错了,请纠正我,但我怀疑您在保存新数据帧时覆盖了所有文件(使用 .mode("overwrite"))。在此过程运行时,您正在尝试读取已删除的文件并引发异常 - 这会使表在一段时间内不可用(在更新期间)。

据我所知,没有你想要的“刷新元数据”的直接方法。

两种(几种可能的)解决方法:

1 - 使用附加模式

如果您只想将新数据框附加到旧数据框,则无需创建临时文件夹并覆盖旧数据框。您可以将保存模式从覆盖更改为追加。这样您就可以将分区添加到现有 Parquet 文件中,而无需重写现有分区。

df.write
  .mode("append")
  .parquet("/temp_table")

这是迄今为止最简单的解决方案,无需读取已存储的数据。但是,如果您必须更新旧数据(例如:如果您正在执行 upsert),这将不起作用。为此,您有选项 2:

2 - 使用 Hive 视图

您可以创建配置单元表并使用视图指向最新(和可用)的表。

以下是此方法背后的逻辑示例:

第 1 部分

如果视图&lt;table_name&gt; 不存在,我们将创建一个名为 &lt;table_name&gt;_alpha0存储新数据 创建表后 我们创建一个视图&lt;table_name&gt;select * from <table_name>_alpha0

第 2 部分

如果视图&lt;table_name&gt;存在,我们需要查看它指向哪个表(&lt;table_name&gt;_alphaN)

您对新数据执行所有操作,将其保存为名为@9​​87654330@的表

创建表后,我们将视图 &lt;table_name&gt; 更改为 select * from &lt;table_name&gt;_alpha(N+1)

还有一个代码示例:

import org.apache.spark.sql.DataFrame, Row, SparkSession
import org.apache.spark.sql.types._
import spark.implicits._


//This method verifies if the view exists and returns the table it is pointing to (using the query 'describe formatted')

def getCurrentTable(spark: SparkSession, databaseName:String, tableName: String): Option[String] = 
  if(spark.catalog.tableExists(s"$databaseName.$tableName")) 

    val rdd_desc = spark.sql(s"describe formatted $databaseName.$tableName")
      .filter("col_name == 'View Text'")
      .rdd

    if(rdd_desc.isEmpty()) 
      None
    
    else 
      Option(
        rdd_desc.first()
          .get(1)
          .toString
          .toLowerCase
          .stripPrefix("select * from ")
      )
    
  
  else
    None


//This method saves a dataframe in the next "alpha table" and updates the view. It maintains 'rounds' tables (default=3). I.e. if the current table is alpha2, the next one will be alpha0 again.

def saveDataframe(spark: SparkSession, databaseName:String, tableName: String, new_df: DataFrame, rounds: Int = 3): Unit =
  val currentTable = getCurrentTable(spark, databaseName, tableName).getOrElse(s"$databaseName.$tableName_alpha$rounds-1")
  val nextAlphaTable = currentTable.replace(s"_alpha$currentTable.last",s"_alpha$(currentTable.last.toInt + 1) % rounds")

  new_df.write
    .mode("overwrite")
    .format("parquet")
    .option("compression","snappy")
    .saveAsTable(nextAlphaTable)

  spark.sql(s"create or replace view $databaseName.$tableName as select * from $nextAlphaTable")


//An example on how to use this:

//SparkSession: spark
val df = Seq((1,"I"),(2,"am"),(3,"a"),(4,"dataframe")).toDF("id","text")
val new_data = Seq((5,"with"),(6,"new"),(7,"data")).toDF("id","text")
val dbName = "test_db"
val tableName = "alpha_test_table"

println(s"Current table: $getCurrentTable(spark, dbName, tableName).getOrElse("Table does not exist")")
println("Saving dataframe")

saveDataframe(spark, dbName, tableName, df)

println("Dataframe saved")
println(s"Current table: $getCurrentTable(spark, dbName, tableName).getOrElse("Table does not exist")")
spark.read.table(s"$dbName.$tableName").show

val processed_df = df.unionByName(new_data) //Or other operations you want to do

println("Saving new dataframe")
saveDataframe(spark, dbName, tableName, processed_df)

println("Dataframe saved")
println(s"Current table: $getCurrentTable(spark, dbName, tableName).getOrElse("Table does not exist")")
spark.read.table(s"$dbName.$tableName").show

结果:

Current table: Table does not exist
Saving dataframe
Dataframe saved
Current table: test_db.alpha_test_table_alpha0
+---+---------+
| id|     text|
+---+---------+
|  3|        a|
|  4|dataframe|
|  1|        I|
|  2|       am|
+---+---------+

Saving new dataframe
Dataframe saved
Current table: test_db.alpha_test_table_alpha1
+---+---------+
| id|     text|
+---+---------+
|  3|        a|
|  4|dataframe|
|  5|     with|
|  6|      new|
|  7|     data|
|  1|        I|
|  2|       am|
+---+---------+

这样做可以保证视图&lt;table_name&gt; 的版本始终可用。这也具有维护表的先前版本的优点(或没有,取决于您的情况)。 以前版本的&lt;table_name_alpha1&gt; 将是&lt;table_name_alpha0&gt;

3 - 奖金

如果可以选择升级 Spark 版本,请查看 Delta Lake(最低 Spark 版本:2.4.2)

希望这会有所帮助:)

【讨论】:

我们还没有集成 Hive。所以使用附加模式,它会创建许多文件。有什么选择吗? 您是否尝试在追加之前进行合并以减少文件数量?【参考方案2】:

Spark 没有像 Zookeeper 这样的事务管理器来对文件进行锁定,因此进行并发读/写是一个需要单独处理的挑战。

要刷新目录,您可以执行以下操作:-

spark.catalog.refreshTable("my_table")

spark.sql(s"REFRESH TABLE $tableName")

【讨论】:

我没有使用 hive,所以我没有任何表名。这种情况该怎么办 您是否在独立运行 Spark,它是否与 Hive Metastore 集成?如果已集成,则此选项将起作用。 没有。仅在 Yarn 中产生火花。我们还没有集成 hive 正如我提到的,Spark 没有事务管理器。只有两个选项 1) 使用最近作为开源项目出现的 Delta Lake 2) 为所有元数据操作集成 Hive,这与使用 Hive 不同。最后一个选项是继续在文件夹位置附加新文件并创建一个视图以仅获取最新记录,这将避免 1) 和 2) 复杂性。【参考方案3】:

先缓存 parquet,然后进行覆盖。

var tmp = sparkSession.read.parquet("path/to/parquet_1").cache()
tmp.write.mode(SaveMode.Overwrite).parquet("path/to/parquet_1") // same path

由于 spark 执行惰性求值而引发错误。当 DAG 在“write”命令上执行时,它开始同时读取 parquet 和 write/overwrite。

【讨论】:

【参考方案4】:

    一个简单的解决方案是先使用 df.cache.count 引入内存,然后与新数据联合并以模式 overwrite 写入 /folder_name。在这种情况下,您不必使用 temp 路径。

    您提到要将/folder_name 重命名为某个临时路径。所以你应该从那个临时路径而不是hdfs://folder_name/part-xxxx-xxx.snappy.parquet 中读取旧数据。

【讨论】:

【参考方案5】:

示例

通过阅读您的问题,我认为这可能是您的问题,如果您应该能够在不使用 DeltaLake 的情况下运行您的代码。在下面的用例中,Spark 将这样运行代码: (1) 将 inputDF 加载到本地存储文件夹位置的文件名 [在本例中为显式部分文件名]; (2a) 到达第 2 行并覆盖 tempLocation 中的文件; (2b) 从 inputDF 加载内容并输出到 tempLocation; (3) 按照与 1 相同的步骤,但在 tempLocation 上; (4a) 删除inputLocation文件夹内的文件; (4b) 尝试加载缓存在1中的部分文件从inputDF加载数据运行union并因为文件不存在而break。

val inputDF = spark.read.format("parquet").load(inputLocation)
inputDF.write.format("parquet").mode("overwrite").save(tempLocation)

val tempDF = spark.read.foramt("parquet").load(tempLocation)

val outputDF = inputDF.unionAll(tempDF)
outputDF.write.format("parquet").mode("overwrite").save(inputLocation)

根据我的经验,您可以遵循两条路径持久化或临时输出用于覆盖的所有内容。

持久性

在下面的用例中,我们将加载 inputDF 并立即将其保存为另一个元素并持久化。执行该操作时,持久性将位于数据上,而不是文件夹内的文件路径上。

否则,您可以在 outputDF 上进行持久化,这将具有相对相同的效果。由于持久性与数据而非文件路径相关联,因此输入的破坏不会导致文件路径在覆盖期间丢失。

val inputDF = spark.read.format("parquet").load(inputLocation) 

val inputDF2 = inputDF.persist
inputDF2.count

inputDF2.write.format("parquet").mode("overwrite").save(tempLocation)

val tempDF = spark.read.foramt("parquet").load(tempLocation)

val outputDF = inputDF2.unionAll(tempDF) outputDF.write.format("parquet").mode("overwrite").save(inputLocation)

临时加载

如果您不为联合输入加载临时输出,而是将 outputDF 完全加载到一个临时文件并重新加载该文件以输出,那么您应该不会看到找不到文件错误。

【讨论】:

以上是关于在读取镶木地板文件时刷新 Dataframe 的元数据的主要内容,如果未能解决你的问题,请参考以下文章

使用 pyspark 比较镶木地板文件的元数据

流式镶木地板文件python并且仅下采样

如何在读取镶木地板文件时检查损坏的文件?

将小 PySpark DataFrame 写入镶木地板时出现内存错误

读取镶木地板文件时,有没有办法在 basePath 中使用通配符?

无法从镶木地板中读取零件文件