Spark Dataset - 每行的“编辑”镶木地板文件
Posted
技术标签:
【中文标题】Spark Dataset - 每行的“编辑”镶木地板文件【英文标题】:Spark Dataset - "edit" parquet file for each row 【发布时间】:2022-01-03 20:42:03 【问题描述】:上下文
我正在尝试使用 Spark/Scala 来有效地“编辑”多个 parquet 文件(可能超过 50k)。唯一需要进行的编辑是根据给定的一组行 ID 进行删除(即删除记录/行)。
parquet 文件作为分区 DataFrame 存储在 s3 中,其中示例分区如下所示:
s3://mybucket/transformed/year=2021/month=11/day=02/*.snappy.parquet
每个分区可以有超过 100 个 parquet 文件,每个文件的大小在 50mb 到 500mb 之间。
输入
我们得到了一个名为 filesToModify
的火花 Dataset[MyClass]
,它有 2 列:
s3path: String
= s3 中需要编辑的 parquet 文件的完整 s3 路径
ids: Set[String]
= 在位于s3path
的 parquet 文件中需要删除的一组 ID(行)
示例输入数据集filesToModify
:
s3path | ids |
---|---|
s3://mybucket/transformed/year=2021/month=11/day=02/part-1.snappy.parquet | Set("a", "b") |
s3://mybucket/transformed/year=2021/month=11/day=02/part-2.snappy.parquet | Set("b") |
预期行为
鉴于 filesToModify
我想利用 Spark 中的并行性,对每个 row
执行以下操作:
-
加载位于
row.s3path
的 parquet 文件
过滤,以便我们排除 id
在集合 row.ids
中的任何行
计算row.ids
中每个 id 的已删除/排除行数(可选)
将过滤后的数据保存回同一row.s3path
以覆盖文件
返回删除的行数(可选)
我尝试过的
我尝试过使用filesToModify.map(row => deleteIDs(row.s3path, row.ids))
,其中deleteIDs
看起来像这样:
def deleteIDs(s3path: String, ids: Set[String]): Int =
import spark.implicits._
val data = spark
.read
.parquet(s3path)
.as[DataModel]
val clean = data
.filter(not(col("id").isInCollection(ids)))
// write to a temp directory and then upload to s3 with same
// prefix as original file to overwrite it
writeToSingleFile(clean, s3path)
1 // dummy output for simplicity (otherwise it should correspond to the number of deleted rows)
但是,当在 map
操作中执行时,这会导致 NullPointerException
。如果我在 map
块之外单独执行它,那么它可以工作,但我不明白为什么它不在其中(与惰性评估有关?)。
【问题讨论】:
【参考方案1】: 传递给deleteIDs
的s3path
和ids
参数实际上分别不是字符串和集合。它们是列。
为了对这些值进行操作,您可以改为创建一个接受列而不是内在类型的 UDF,或者您可以收集足够小的数据集,以便您可以直接使用 deleteIDs
函数中的值。如果您想利用 Spark 的并行性,前者可能是您最好的选择。
您可以阅读有关 UDF 的信息here
【讨论】:
【参考方案2】:您收到NullPointerException
,因为您尝试从执行程序检索您的火花会话。
这不是明确的,但要执行 spark 操作,您的 DeleteIDs
函数需要检索活动的 spark 会话。为此,它从SparkSession
对象调用方法getActiveSession
。但是当从执行程序调用时,这个getActiveSession
方法返回None
,如SparkSession's source code 中所述:
返回构建器返回的默认 SparkSession。
注意:在执行器上调用此函数时返回 None
因此,当您的代码开始使用此 None
spark 会话时,会抛出 NullPointerException
。
更一般地,您不能重新创建数据集并在另一个数据集的转换中使用 spark 转换/操作。
所以我看到了两种解决您问题的方法:
要么在不使用 spark 的情况下重写DeleteIDs
函数的代码,要么使用 parquet4s 修改 parquet 文件。
或将filesToModify
转换为Scala 集合并使用Scala 的map
而不是Spark 的集合。
【讨论】:
以上是关于Spark Dataset - 每行的“编辑”镶木地板文件的主要内容,如果未能解决你的问题,请参考以下文章
Spark中的DataFrame,Dataset和RDD之间的区别
如何解决 Spark 中的“aggregateByKey 不是 org.apache.spark.sql.Dataset 的成员”?