跨数据集行的数组元素总和 - Spark Scala

Posted

技术标签:

【中文标题】跨数据集行的数组元素总和 - Spark Scala【英文标题】:Element-wise sum of array across rows of a dataset - Spark Scala 【发布时间】:2019-06-26 10:11:38 【问题描述】:

我正在尝试根据“id”列对以下数据集进行分组,并按元素对“values”列中的数组求和。我如何在 Spark 中使用 Scala 做到这一点?

输入:(2列数据集,String类型column1和Array[Int]类型column2)

| id | values |
---------------
| A | [12,61,23,43]
| A | [43,11,24,45]
| B | [32,12,53,21]
| C | [11,12,13,14]
| C | [43,52,12,52]
| B | [33,21,15,24]

预期输出:(数据集或数据框)

| id | values |
---------------
| A | [55,72,47,88]
| B | [65,33,68,45]
| C | [54,64,25,66]

注意: 结果必须是灵活和动态的。也就是说,即使有 1000 列或者即使文件是几个 TB 或 PB,该解决方案仍然应该保持良好。

【问题讨论】:

类似 sqlDF.groupBy("id").sum("values").as("sum_values").select("id", "sum_values") 的东西应该适合你。跨度> 【参考方案1】:

当您说它必须灵活时,我有点不确定您的意思是什么,但在我的脑海中,我可以想到几种方法。第一个(在我看来是最漂亮的)一个使用udf

// Creating a small test example
val testDF = spark.sparkContext.parallelize(Seq(("a", Seq(1,2,3)), ("a", Seq(4,5,6)), ("b", Seq(1,3,4)))).toDF("id", "arr")
val sum_arr = udf((list: Seq[Seq[Int]]) => list.transpose.map(arr => arr.sum))

testDF
  .groupBy('id)
  .agg(sum_arr(collect_list('arr)) as "summed_values")

但是,如果您有数十亿个相同的 id,collect_list 当然会是个问题。在这种情况下,您可以执行以下操作:

testDF
  .flatMapcase Row(id: String, list: Seq[Int]) => list.indices.map(index => (id, index, list(index)))
  .toDF("id", "arr_index", "arr_element")
  .groupBy('id, 'arr_index)
  .agg(sum("arr_element") as "sum")
  .groupBy('id)
  .agg(collect_list('sum) as "summed_values")

【讨论】:

【参考方案2】:

以下单行解决方案对我有用

ds.groupBy("Country").agg(array((0 until n).map(i => sum(col("Values").getItem(i))) :_* ) as "Values")

【讨论】:

以上是关于跨数据集行的数组元素总和 - Spark Scala的主要内容,如果未能解决你的问题,请参考以下文章

将每个文件激发到数据集行

检查给定行和列总和是不是只有 2 行的二进制矩阵

如何将结果集行作为索引子数组添加到结果数组中?

如何在 spark sql 中对数组进行成员操作?

数组中元素的组合返回总和

Pymongo MapReduce 作为子数组元素的总和