Spark Join 数据框并有条件地更新列

Posted

技术标签:

【中文标题】Spark Join 数据框并有条件地更新列【英文标题】:Spark Join dataframes and conditionally update columns 【发布时间】:2019-05-28 13:47:32 【问题描述】:

您好,我有 2 个 spark 数据帧。 第一个:

+---------------+---------------+-------+--------+---------+-----------+------------+---------------+----------------+
|cluster_socio_6|cluster_socio_7|country|latitude|longitude|last_update|         uid|segment_comp_11|cluster_comp_170|
+---------------+---------------+-------+--------+---------+-----------+------------+---------------+----------------+
|              2|              2|     IT|  41.884|  13.5204| 2019-04-15|d@rNdBkkN-p3|             10|               3|
|             16|             15|     IT| 45.5298|  9.03813| 2019-04-15|Ie2Bbs9PUR8h|             15|               4|
|             16|             15|     IT| 45.5298|  9.03813| 2019-04-15|Jk2Bbs9PUR8h|             15|               4|
+---------------+---------------+-------+--------+---------+-----------+------------+---------------+----------------+  

第二个:

+---------------+---------------+-------+--------+---------+-----------+------------+
|cluster_socio_6|cluster_socio_7|country|latitude|longitude|last_update|         uid|
+---------------+---------------+-------+--------+---------+-----------+------------+
|              4|             17|     IT| 40.8413|  14.2008| 2019-04-16|ASBuzjKa6nIB|
|              2|              2|     IT|  41.884|  15.5204| 2019-04-16|d@rNdBkkN-p3|
|             16|             15|     IT| 45.5298|  9.03813| 2019-04-16|Ie2Bbs9PUR8h|
|             16|             15|     IT| 45.5298|  9.03813| 2019-04-15|xyzBbs9PUR8h|
+---------------+---------------+-------+--------+---------+-----------+------------+  

除了国家、纬度、经度、last_update 和 uid,底部 Df 可以添加不同的列。 这个想法是通过 uid 进行完全连接,更新常用列并保留不常用列。 我怎么能完成这个任务? 谢谢。

【问题讨论】:

要保留的常用列在不同值的情况下如何处理? 我想用底部表格中的值更新公共列 【参考方案1】:

这是代码(你没有指定,所以让我们试试 Scala):

// Your dataframes
val upper = ...
val lower = ...

// Find out the columns
val sharedCols = upper.columns.toSet & lower.columns.toSet
val disjointCols = (upper.columns.toSet | lower.columns.toSet) -- sharedCols
val columns = (sharedCols.map(c => coalesce(lower.col(c), upper.col(c)).as(c)) ++ disjointCols.map(c => col(c))).toList

// Join and project    
val joined = upper.join(lower, upper.col("uid") === lower.col("uid"), "full_outer").select(columns:_*)
joined.show

【讨论】:

【参考方案2】:

如果如您在 cmets 中所说,您希望 始终 使用底部表格中的公共列。你可以做一个简单的连接,在连接之前从 df1 丢失常见的 cloums。

joined_df = df1.drop("some_common_columns").join(df2,Seq("uid"))

这将为您留下只有来自 df1 的常见 cloums 和新加入的df中两个 dfs 的不常见的连接数据

【讨论】:

【参考方案3】:

我找到了这个解决方案,以避免由于加入而造成的洗牌。 大家觉得呢? 我可以使用任何改进或 scala 快捷方式吗?

    def func_union_name(myCols: Set[String], allCols: Set[String]) = 
        allCols.toList.map(x => x match 
          case x if myCols.contains(x) => col(x)
          case _ => lit(null).as(x)
        )
        

在定义了上面的函数之后,我做了:

          val upper_col = tableToUpdate.columns.toSet
          val bottom_col = miniJoin.columns.toSet
          val union_cols = tableToUpdate_col ++ miniJoin_col

              upper
                .select(func_union_name(tableToUpdate_col, union_cols): _*)
                .union(bottom.select(func_union_name(bottom_col, union_cols): _*))            
                .withColumn("max_lu",max(col("last_update"))
                                      .over(Window.partitionBy(col("uid"))))
                .filter(col("last_update").geq(col("max_lu")))
                .drop(col("max_lu"))

【讨论】:

以上是关于Spark Join 数据框并有条件地更新列的主要内容,如果未能解决你的问题,请参考以下文章

比较两个(py)spark sql数据框并在保持连接列的同时有条件地选择列数据

Spark:有条件地将 col1 值替换为 col2 [重复]

在 Spark 中使用相应的列名(有条件地)更改数据框

Apache Spark join 操作扩展性差

NotNull 条件不适用于 spark 数据框 scala 中的 withColumn 条件

如何在spark scala数据框中更新嵌套列的xml值