参考另一个数组列的 Spark 数据帧聚合

Posted

技术标签:

【中文标题】参考另一个数组列的 Spark 数据帧聚合【英文标题】:Spark dataframe aggregation with reference to another array column 【发布时间】:2021-11-26 12:18:09 【问题描述】:

通过查看索引数组聚合双精度数组时,我遇到了与性能相关的问题。 我的意思是。原始数据框看起来像这样:

original Dataframe 

| id | prop1        | values                  |
|----|--------------|-------------------------|
|  1 | [2,5,1,3]    |   [ 0.1, 0.5, 0.7, 0.8] |
|  2 | [2,1]        |   [ 0.2, 0.3 ]          |
|  1 | [1,5]        |   [ 0.4, 0.3 ]          |
|  2 | [3,2]        |   [ 0.0, 0.1 ]          |

so in the column 2 which is prop1 is an int array having values within range of 1 to 5 but not in a order and there can be missing numbers within array.

Prop1 int 数组类似于双精度数组值的索引 我的意思是第 1 行在爆炸时看起来像下面这样

| id | prop1 | values |
|----|-------|--------|
|  1 | 2     |   0.1  |
|  1 | 5     |   0.5  |
|  1 | 1     |   0.7  |
|  1 | 3     |   0.8  |

最后一个问题,

所以我需要通过查看索引数组和列 id 来聚合双精度数组的值

所以结果应该是

| id | prop1          | values                   | 
|----|----------------|--------------------------| 
|  1 | [2,5,1,3]      |   [ 0.1, 0.8, 1.1, 0.8 ] | 
|  2 | [2,1,3]        |   [ 0.3, 0.3, 0.0 ]      | 


Below code I am using to extract the values by index and pivot right before merging them to array

//dummy dataframe to get the sequence of 5 but the upper end is dynamic value and that can extend till 300k
var df = (1 to 5).toDF("prop1")

//joining original Df by prop1 column 
var stgDf = originalDf.join(df,originalDf.col("prop1") ===  df.col("prop1"),"inner")

// pivoting the values by index
var pivotDf = stgDf.groupBy("id")
             .pivot("prop1").agg(first("values"))

 // now aggregating the pivoted  values by id
 var expr = pivtoDf.columns.map(sum(_))
 var pivotDf.groupBy("id").agg(expr.head,expr.tail:_*)

 //then grouping back into array by id

这个解决方案我使用爆炸 prop1 和 value,它确实适用于几行,但在实际问题中,两列的数组每个都可以超过 500k 值,没有。每个 id 的行数可以超过 3000 万

如果有人能在这方面寻求帮助,那就太好了。应用程序是使用 spark 2.4 在 scala 中构建的

提前致谢

【问题讨论】:

那么问题出在哪里? 30m不算什么。它的大数据。 也显示您的代码 @thebluephantom 请检查已编辑的帖子和代码。问题是如何按索引聚合双数组的值,索引是按列 id 的数组。每个 id 有 30m,数据框可能存在 1000 个 id。请检查已编辑的帖子,看看您是否可以提供帮助。谢谢 好的,这是大数据。我稍后再看 还需要帮助吗? 【参考方案1】:

适用于 v3.x,而不是 v2.4。升级,太难了。

一些严重的数据争吵!

可能有更好的方法,但它是可扩展的。可能需要很多分区。

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Column

val arrayStructureData = Seq(
Row(1,List(2,5,1,3),List(0.1, 0.5, 0.7, 0.8)),
Row(2,List(2,1),List(0.2, 0.3)),
Row(1,List(1,5),List(0.4, 0.3)),
Row(2,List(3,2),List(0.0, 0.1)) 
)
// Just a single StructType for the Row
val arrayStructureSchema = new StructType()
    .add("id",IntegerType)
    .add("prop1", ArrayType(IntegerType))
    .add("values", ArrayType(DoubleType))
val df = spark.createDataFrame(spark.sparkContext.parallelize(arrayStructureData),arrayStructureSchema)
df.printSchema()
df.show()

val df2 = df.withColumn(
  "jCols",
  zip_with(
    col("prop1"),
    col("values"),
      // Should be a struct really, but...array used. zip_with not available in v2.4!
      (left: Column, right: Column) => array(left, right)
  )
).drop('prop1).drop('values)

df2.show(false)
df2.printSchema()

val df3 = df2.groupBy("id").agg(collect_list("jCols").as("jCols"))
df3.printSchema()
df3.show(false)

val df4 = df3.select($"id",flatten($"jCols").as("jCols"))
df4.show(false)
df4.printSchema()

val df5 = df4.withColumn("ExjCols", explode($"jCols")).drop("jCols")
df5.show(false)
df5.printSchema()

val df6 = df5.select(col("id"),col("ExjCols")(0).as("prop1"),col("ExjCols")(1).as("values"))
df6.show(false)
df6.printSchema()

val df7 = df6.groupBy("id", "prop1").sum("values").toDF("id","prop1","values") 
df7.show(false)
df7.printSchema()

val df8 = df7.withColumn("combined", array($"prop1", $"values"))
df8.show(false)
df8.printSchema()

val df9 = df8.groupBy("id").agg(collect_list("combined").as("propN"))
df9.show(false)
df9.printSchema()

val res = df9.withColumn("prop1",expr("transform(propN, x -> x[0])")).withColumn("values",expr("transform(propN, x -> x[1])")).drop('propN)
res.show(false)

返回:

+---+--------------------+-------------------------------+
|id |prop1               |values                         |
+---+--------------------+-------------------------------+
|1  |[2.0, 5.0, 1.0, 3.0]|[0.1, 0.8, 1.1, 0.8]           |
|2  |[2.0, 1.0, 3.0]     |[0.30000000000000004, 0.3, 0.0]|
+---+--------------------+-------------------------------+

不知道为什么会出现精度 0.3000...,但确实如此。还更正了示例,它有一些错误。

我只能假设 SO 现在不太受欢迎,因为它需要一段时间才能得到答案。

【讨论】:

以上是关于参考另一个数组列的 Spark 数据帧聚合的主要内容,如果未能解决你的问题,请参考以下文章

Spark中具有固定向量的数据帧行的点积

将一个数据帧的数组列与scala中另一个数据帧的数组列的子集进行比较

提取列值并将其作为 Spark 数据帧中的数组分配给另一列

处理依赖于 Spark 数据集中另一列的逗号分隔列

Spark:数据帧聚合(Scala)

在 Python/PySpark 中 Spark 复制数据框列的最佳实践?