在 Spark scala 中更新数据框的最佳方法

Posted

技术标签:

【中文标题】在 Spark scala 中更新数据框的最佳方法【英文标题】:Best way to update a dataframe in Spark scala 【发布时间】:2020-02-04 16:20:38 【问题描述】:

考虑两个 Dataframe data_dfupdate_df。这两个数据框具有相同的架构(key, update_time, bunch of columns)

我知道用update_df“更新”data_df 的两种(主要)方法

    完全外连接 我加入两个数据框(在键上),然后选择适当的列(根据 update_timestamp 的值) 最大超过分区 合并两个数据帧,按键计算最大 update_timestamp,然后仅过滤等于该最大值的行。

以下是问题:

还有其他方法吗? 哪种方法最好?为什么?

我已经与一些开放数据进行了比较 这是加入代码

var join_df = data_df.alias("data").join(maj_df.alias("maj"), Seq("key"), "outer")
var res_df = join_df.where( $"data.update_time" > $"maj.update_time" || $"maj.update_time".isNull)
             .select(col("data.*"))
         .union(
         join_df.where( $"data.update_time" < $"maj.update_time" || $"data.update_time".isNull)
             .select(col("maj.*")))

这是窗口代码

import org.apache.spark.sql.expressions._

val byKey = Window.partitionBy($"key") // orderBy is implicit here

res_df = data_df.union(maj_df)
                .withColumn("max_version", max("update_time").over(byKey))
                .where($"update_time" === $"max_version")

如果需要,我可以在此处粘贴您的 DAG 和计划,但它们非常大

我的第一个猜测是连接解决方​​案可能是最好的方法,但它只有在 update 数据框每个键只有一个版本时才有效。


PS:我知道 Apache Delta 解决方案,但遗憾的是我无法使用它。

【问题讨论】:

什么是 Apache Delta? docs.delta.io/0.4.0/delta-update.html 【参考方案1】:

下面是一种只加入键的方法,以尽量减少过滤器和加入命令使用的内存量。

///Two records, one with a change, one no change
val originalDF = spark.sql("select 'aa' as Key, 'Joe' as Name").unionAll(spark.sql("select 'cc' as Key, 'Doe' as Name")) 


///Two records, one change, one new
val updateDF =  = spark.sql("select 'aa' as Key, 'Aoe' as Name").unionAll(spark.sql("select 'bb' as Key, 'Moe' as Name"))

///Make new DFs of each just for Key   
val originalKeyDF = originalDF.selectExpr("Key")
val updateKeyDF = updateDF.selectExpr("Key")

///Find the keys that are similar between both
val joinKeyDF = updateKeyDF.join(originalKeyDF, updateKeyDF("Key") === originalKeyDF("Key"), "inner")
///Turn the known keys into an Array
val joinKeyArray = joinKeyDF.select(originalKeyDF("Key")).rdd.map(x=>x.mkString).collect

///Filter the rows from original that are not found in the new file
val originalNoChangeDF = originalDF.where(!($"Key".isin(joinKeyArray:_*)))

///Update the output with unchanged records, update records, and new records
val finalDF = originalNoChangeDF.unionAll(updateDF)

【讨论】:

以上是关于在 Spark scala 中更新数据框的最佳方法的主要内容,如果未能解决你的问题,请参考以下文章

在 Scala Spark 中使用数据框的朴素贝叶斯多项式文本分类器

使用数据框的子集和 spark/scala 中的两个特定字段过滤数据框 [关闭]

在 Scala 中设计和并行化 Spark 应用程序的最佳方法 [关闭]

如何在scala spark中将数据框的特定列与另一个列连接[重复]

使用 spark 和 scala 进行连接计数时获得性能的最佳方法

Apache Spark (scala) + python/R 数据分析工作流程