Spark Dataset - 每行的“编辑”镶木地板文件

Posted

技术标签:

【中文标题】Spark Dataset - 每行的“编辑”镶木地板文件【英文标题】:Spark Dataset - "edit" parquet file for each row 【发布时间】:2022-01-03 20:42:03 【问题描述】:

上下文

我正在尝试使用 Spark/Scala 来有效地“编辑”多个 parquet 文件(可能超过 50k)。唯一需要进行的编辑是根据给定的一组行 ID 进行删除(即删除记录/行)。

parquet 文件作为分区 DataFrame 存储在 s3 中,其中示例分区如下所示:

s3://mybucket/transformed/year=2021/month=11/day=02/*.snappy.parquet

每个分区可以有超过 100 个 parquet 文件,每个文件的大小在 50mb 到 500mb 之间。

输入

我们得到了一个名为 filesToModify 的火花 Dataset[MyClass],它有 2 列:

    s3path: String = s3 中需要编辑的 parquet 文件的完整 s3 路径 ids: Set[String] = 在位于s3path 的 parquet 文件中需要删除的一组 ID(行)

示例输入数据集filesToModify

s3path ids
s3://mybucket/transformed/year=2021/month=11/day=02/part-1.snappy.parquet Set("a", "b")
s3://mybucket/transformed/year=2021/month=11/day=02/part-2.snappy.parquet Set("b")

预期行为

鉴于 filesToModify 我想利用 Spark 中的并行性,对每个 row 执行以下操作:

    加载位于row.s3path 的 parquet 文件 过滤,以便我们排除 id 在集合 row.ids 中的任何行 计算row.ids 中每个 id 的已删除/排除行数(可选) 将过滤后的数据保存回同一row.s3path以覆盖文件 返回删除的行数(可选)

我尝试过的

我尝试过使用filesToModify.map(row => deleteIDs(row.s3path, row.ids)),其中deleteIDs 看起来像这样:

def deleteIDs(s3path: String, ids: Set[String]): Int = 
    import spark.implicits._
    val data = spark
        .read
        .parquet(s3path)
        .as[DataModel]

    val clean = data
        .filter(not(col("id").isInCollection(ids)))

    // write to a temp directory and then upload to s3 with same
    // prefix as original file to overwrite it
    writeToSingleFile(clean, s3path)

    1 // dummy output for simplicity (otherwise it should correspond to the number of deleted rows)
    

但是,当在 map 操作中执行时,这会导致 NullPointerException。如果我在 map 块之外单独执行它,那么它可以工作,但我不明白为什么它不在其中(与惰性评估有关?)。

【问题讨论】:

【参考方案1】:

传递给deleteIDss3pathids 参数实际上分别不是字符串和集合。它们是列。

为了对这些值进行操作,您可以改为创建一个接受列而不是内在类型的 UDF,或者您可以收集足够小的数据集,以便您可以直接使用 deleteIDs 函数中的值。如果您想利用 Spark 的并行性,前者可能是您最好的选择。

您可以阅读有关 UDF 的信息here

【讨论】:

【参考方案2】:

您收到NullPointerException,因为您尝试从执行程序检索您的火花会话。

这不是明确的,但要执行 spark 操作,您的 DeleteIDs 函数需要检索活动的 spark 会话。为此,它从SparkSession 对象调用方法getActiveSession。但是当从执行程序调用时,这个getActiveSession 方法返回None,如SparkSession's source code 中所述:

返回构建器返回的默认 SparkSession。

注意:在执行器上调用此函数时返回 None

因此,当您的代码开始使用此 None spark 会话时,会抛出 NullPointerException

更一般地,您不能重新创建数据集并在另一个数据集的转换中使用 spark 转换/操作。

所以我看到了两种解决您问题的方法:

要么在不使用 spark 的情况下重写 DeleteIDs 函数的代码,要么使用 parquet4s 修改 parquet 文件。 或将filesToModify 转换为Scala 集合并使用Scala 的map 而不是Spark 的集合。

【讨论】:

以上是关于Spark Dataset - 每行的“编辑”镶木地板文件的主要内容,如果未能解决你的问题,请参考以下文章

Spark中的DataFrame,Dataset和RDD之间的区别

大数据Spark Dataset

如何解决 Spark 中的“aggregateByKey 不是 org.apache.spark.sql.Dataset 的成员”?

spark sql - Dataset数据类型

spark sql - Dataset数据类型

spark结构化数据处理:Spark SQLDataFrame和Dataset