使用列名数组中的 UDF 将列合并到单个映射中

Posted

技术标签:

【中文标题】使用列名数组中的 UDF 将列合并到单个映射中【英文标题】:Merge columns into single map with UDF from Array of Column Names 【发布时间】:2019-06-28 07:29:18 【问题描述】:

所以我有一个需要将值相加的数据框,然后放入 Map[String,Long] 格式以保存到 Cassandra。

下面的代码有效,但是我想知道是否可以基于列的抽象列表创建地图。 (查看source code 的功能只会让我更加困惑)。

var cols = Array("key", "v1", "v2")
var df = Seq(("a",1,0),("b",1,0),("a",1,1),("b",0,0)).toDF(cols: _*)
val df1 = df.groupBy(col(cols(0))).
  agg(map(lit(cols(1)), sum(col(cols(1))), lit(cols(2)), sum(col(cols(2)))) as "map")

这是我想要的数据框格式和上面代码的当前给定结果:

scala> df1.show(false)
+---+---------------------+
|key|map                  |
+---+---------------------+
|b  |Map(v1 -> 1, v2 -> 0)|
|a  |Map(v1 -> 2, v2 -> 1)|
+---+---------------------+

我希望看到一个函数,它可以返回与上面相同但能够以编程方式根据名称放置列。例如:

var columnNames = Array("v1", "v2")
df.groupBy(col(cols(0))).agg(create_sum_map(columnNames) as "map")

这在 Spark 中是否可以远程实现?

【问题讨论】:

【参考方案1】:

无需使用慢速UDF,您可以使用纯内置 Spark 函数和可变参数来实现这一点,参见例如Spark SQL: apply aggregate functions to a list of columns。此解决方案需要构建一个可以应用聚合的列列表。在这里,它有点复杂,因为您希望在最终输出中使用map,这需要一个额外的步骤。

首先创建要在聚合中使用的表达式(列):

val exprs = cols.tail.flatMap(c => Seq(lit(c), sum(col(c))))

应用组并使用创建的exprs

val df2 = df.groupBy(col(cols.head)).agg(exprs.head, exprs.tail:_*)
  .select(col(cols.head), map(cols.tail.flatMap(c => Seq(col(c), col(s"sum($c)"))):_*).as("map"))

创建map 需要额外的selectcols.tail.flatMap(c => Seq(col(c), col(s"sum($c)")) 只是应添加到map 的新列的列表。

结果输出与之前相同:

+---+---------------------+
|key|map                  |
+---+---------------------+
|b  |Map(v1 -> 1, v2 -> 0)|
|a  |Map(v1 -> 2, v2 -> 1)|
+---+---------------------+

【讨论】:

啊哈!所以Array:_* 把数组变成了一组参数。我想知道为什么 Spark 不允许 .agg(exprs:_*)。这很有帮助 - 谢谢。【参考方案2】:

所以我想出了如何根据@Shaido 的回答生成我想要的答案的结果。

def create_sum_map(cols: Array[String]): Column = 
  map(cols.flatMap(c => Seq(lit(c), sum(col(c)))):_*)

df.groupBy(col(cols.head)).agg(create_sum_map(columnNames) as "map")

我认为这是可行的,因为在 .agg() 函数中的 create_sum_map() 中存在带有受影响列的 sum(Column)

【讨论】:

以上是关于使用列名数组中的 UDF 将列合并到单个映射中的主要内容,如果未能解决你的问题,请参考以下文章

根据几个条件将列从一个数据帧映射到另一个数据帧,以考虑存在的多个映射中的一个映射

如何将列中的所有数据移动到单个列(不合并),然后拆分为R中的新列?

在 React 中映射 JSON 数组时如何使用 onClick 定位列表中的单个项目

将数据表中的列名映射到 C# 中的另一个数据表

如何使函数在映射数组中的单个项目而不是数组中的每个项目上执行?

使用 withColumn 和 callUDF 将列附加到数据框