在 Spark scala 中更新数据框的最佳方法
Posted
技术标签:
【中文标题】在 Spark scala 中更新数据框的最佳方法【英文标题】:Best way to update a dataframe in Spark scala 【发布时间】:2020-02-04 16:20:38 【问题描述】:考虑两个 Dataframe data_df
和 update_df
。这两个数据框具有相同的架构(key, update_time, bunch of columns)
。
我知道用update_df
“更新”data_df
的两种(主要)方法
-
完全外连接
我加入两个数据框(在键上),然后选择适当的列(根据 update_timestamp 的值)
最大超过分区
合并两个数据帧,按键计算最大 update_timestamp,然后仅过滤等于该最大值的行。
以下是问题:
还有其他方法吗? 哪种方法最好?为什么?我已经与一些开放数据进行了比较 这是加入代码
var join_df = data_df.alias("data").join(maj_df.alias("maj"), Seq("key"), "outer")
var res_df = join_df.where( $"data.update_time" > $"maj.update_time" || $"maj.update_time".isNull)
.select(col("data.*"))
.union(
join_df.where( $"data.update_time" < $"maj.update_time" || $"data.update_time".isNull)
.select(col("maj.*")))
这是窗口代码
import org.apache.spark.sql.expressions._
val byKey = Window.partitionBy($"key") // orderBy is implicit here
res_df = data_df.union(maj_df)
.withColumn("max_version", max("update_time").over(byKey))
.where($"update_time" === $"max_version")
如果需要,我可以在此处粘贴您的 DAG 和计划,但它们非常大
我的第一个猜测是连接解决方案可能是最好的方法,但它只有在 update
数据框每个键只有一个版本时才有效。
PS:我知道 Apache Delta 解决方案,但遗憾的是我无法使用它。
【问题讨论】:
什么是 Apache Delta? docs.delta.io/0.4.0/delta-update.html 【参考方案1】:下面是一种只加入键的方法,以尽量减少过滤器和加入命令使用的内存量。
///Two records, one with a change, one no change
val originalDF = spark.sql("select 'aa' as Key, 'Joe' as Name").unionAll(spark.sql("select 'cc' as Key, 'Doe' as Name"))
///Two records, one change, one new
val updateDF = = spark.sql("select 'aa' as Key, 'Aoe' as Name").unionAll(spark.sql("select 'bb' as Key, 'Moe' as Name"))
///Make new DFs of each just for Key
val originalKeyDF = originalDF.selectExpr("Key")
val updateKeyDF = updateDF.selectExpr("Key")
///Find the keys that are similar between both
val joinKeyDF = updateKeyDF.join(originalKeyDF, updateKeyDF("Key") === originalKeyDF("Key"), "inner")
///Turn the known keys into an Array
val joinKeyArray = joinKeyDF.select(originalKeyDF("Key")).rdd.map(x=>x.mkString).collect
///Filter the rows from original that are not found in the new file
val originalNoChangeDF = originalDF.where(!($"Key".isin(joinKeyArray:_*)))
///Update the output with unchanged records, update records, and new records
val finalDF = originalNoChangeDF.unionAll(updateDF)
【讨论】:
以上是关于在 Spark scala 中更新数据框的最佳方法的主要内容,如果未能解决你的问题,请参考以下文章
在 Scala Spark 中使用数据框的朴素贝叶斯多项式文本分类器
使用数据框的子集和 spark/scala 中的两个特定字段过滤数据框 [关闭]
在 Scala 中设计和并行化 Spark 应用程序的最佳方法 [关闭]
如何在scala spark中将数据框的特定列与另一个列连接[重复]