Spark Join 数据框并有条件地更新列
Posted
技术标签:
【中文标题】Spark Join 数据框并有条件地更新列【英文标题】:Spark Join dataframes and conditionally update columns 【发布时间】:2019-05-28 13:47:32 【问题描述】:您好,我有 2 个 spark 数据帧。 第一个:
+---------------+---------------+-------+--------+---------+-----------+------------+---------------+----------------+
|cluster_socio_6|cluster_socio_7|country|latitude|longitude|last_update| uid|segment_comp_11|cluster_comp_170|
+---------------+---------------+-------+--------+---------+-----------+------------+---------------+----------------+
| 2| 2| IT| 41.884| 13.5204| 2019-04-15|d@rNdBkkN-p3| 10| 3|
| 16| 15| IT| 45.5298| 9.03813| 2019-04-15|Ie2Bbs9PUR8h| 15| 4|
| 16| 15| IT| 45.5298| 9.03813| 2019-04-15|Jk2Bbs9PUR8h| 15| 4|
+---------------+---------------+-------+--------+---------+-----------+------------+---------------+----------------+
第二个:
+---------------+---------------+-------+--------+---------+-----------+------------+
|cluster_socio_6|cluster_socio_7|country|latitude|longitude|last_update| uid|
+---------------+---------------+-------+--------+---------+-----------+------------+
| 4| 17| IT| 40.8413| 14.2008| 2019-04-16|ASBuzjKa6nIB|
| 2| 2| IT| 41.884| 15.5204| 2019-04-16|d@rNdBkkN-p3|
| 16| 15| IT| 45.5298| 9.03813| 2019-04-16|Ie2Bbs9PUR8h|
| 16| 15| IT| 45.5298| 9.03813| 2019-04-15|xyzBbs9PUR8h|
+---------------+---------------+-------+--------+---------+-----------+------------+
除了国家、纬度、经度、last_update 和 uid,底部 Df 可以添加不同的列。 这个想法是通过 uid 进行完全连接,更新常用列并保留不常用列。 我怎么能完成这个任务? 谢谢。
【问题讨论】:
要保留的常用列在不同值的情况下如何处理? 我想用底部表格中的值更新公共列 【参考方案1】:这是代码(你没有指定,所以让我们试试 Scala):
// Your dataframes
val upper = ...
val lower = ...
// Find out the columns
val sharedCols = upper.columns.toSet & lower.columns.toSet
val disjointCols = (upper.columns.toSet | lower.columns.toSet) -- sharedCols
val columns = (sharedCols.map(c => coalesce(lower.col(c), upper.col(c)).as(c)) ++ disjointCols.map(c => col(c))).toList
// Join and project
val joined = upper.join(lower, upper.col("uid") === lower.col("uid"), "full_outer").select(columns:_*)
joined.show
【讨论】:
【参考方案2】:如果如您在 cmets 中所说,您希望 始终 使用底部表格中的公共列。你可以做一个简单的连接,在连接之前从 df1 丢失常见的 cloums。
joined_df = df1.drop("some_common_columns").join(df2,Seq("uid"))
这将为您留下只有来自 df1 的常见 cloums 和新加入的df中两个 dfs 的不常见的连接数据
【讨论】:
【参考方案3】:我找到了这个解决方案,以避免由于加入而造成的洗牌。 大家觉得呢? 我可以使用任何改进或 scala 快捷方式吗?
def func_union_name(myCols: Set[String], allCols: Set[String]) =
allCols.toList.map(x => x match
case x if myCols.contains(x) => col(x)
case _ => lit(null).as(x)
)
在定义了上面的函数之后,我做了:
val upper_col = tableToUpdate.columns.toSet
val bottom_col = miniJoin.columns.toSet
val union_cols = tableToUpdate_col ++ miniJoin_col
upper
.select(func_union_name(tableToUpdate_col, union_cols): _*)
.union(bottom.select(func_union_name(bottom_col, union_cols): _*))
.withColumn("max_lu",max(col("last_update"))
.over(Window.partitionBy(col("uid"))))
.filter(col("last_update").geq(col("max_lu")))
.drop(col("max_lu"))
【讨论】:
以上是关于Spark Join 数据框并有条件地更新列的主要内容,如果未能解决你的问题,请参考以下文章
比较两个(py)spark sql数据框并在保持连接列的同时有条件地选择列数据
Spark:有条件地将 col1 值替换为 col2 [重复]