scala spark减少groupby中的列表

Posted

技术标签:

【中文标题】scala spark减少groupby中的列表【英文标题】:scala spark reduce list in groupby 【发布时间】:2021-12-02 22:49:27 【问题描述】:

我有两列的 spark DataFrame

colA colB
1    3
1    2
2    4
2    5
2    1

我想 groupBy colA 并为每个组迭代 colB 列表,这样:

res = 0
for i in collect_list(col("colB")):
    res += i * (3+res)

返回值为res

所以我明白了:

colA colB
1    24
2    78

如何在 scala 中做到这一点?

【问题讨论】:

使用reduceByKey 感谢您为我指明方向。很乐意为我提供一个最低限度的工作示例,例如列表(8,4)?谢谢 【参考方案1】:

您可以通过以下方式达到您想要的结果:


val df = Seq((1,3), (1,2), (2,4), (2,5), (2,1)).toDF("colA", "colB")
val retDf = df
  .groupBy("colA")
  .agg(
    aggregate(
      collect_list("colB"), lit(0), (acc, nxt) => nxt * (acc + 3)
    ) as "colB")

但是,您需要非常小心,因为 Spark 上的数据是分布式的。如果数据在读入 Spark 后被打乱,则不能保证它会以相同的顺序被收集。在玩具示例中,collect_list("colB") 将返回Seq(3,2),其中colA1。但是,如果在早期阶段有任何洗牌,collect_list 也可以返回Seq(2,3),这将为您提供27,而不是所需的24。您需要为您的数据提供一些元数据,您可以使用这些元数据来确保按照您期望的顺序处理这些数据,例如使用 monotonicallyIncreasingId 方法。

【讨论】:

您自己承认答案不正确。 怎么样?它解决了提出的问题。我不知道使用该解决方案的上下文,因此我提供了必要的警告,并指出了如何解决该问题。 查看您的预订...不确定...【参考方案2】:

不会丢失排序的 RDD 方法。

%scala
val rdd1 = spark.sparkContext.parallelize(Seq((1,3), (1,3), (2,4), (2,5), (2,1))).zipWithIndex().map(x => ((x._1._1), (x._1._2, x._2)) )
val rdd2 = rdd1.groupByKey
// Convert to Array.
val rdd3 = rdd2.map(x => (x._1, x._2.toArray)) 
val rdd4 = rdd3.map(x => (x._1, x._2.sortBy(_._2)))
val rdd5 = rdd4.mapValues(v => v.map(_._1))
rdd5.collect()

val res = rdd5.map(x => (x._1, x._2.fold(0)((acc, nxt) => nxt * (acc + 3) )))
res.collect()

返回:

res201: Array[(Int, Int)] = Array((1,24), (2,78))

根据需要从 DF 转换到 DF。

【讨论】:

有点奇怪。 我想知道公式是否正确,我认为第一个值应该是 33 由于 res += 需要 + acc 添加。 foldLeft 也是可能的。

以上是关于scala spark减少groupby中的列表的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Spark Scala 中将 WrappedArray 转换为 List?

Scala groupBy 项目列表中的所有元素

在 groupBy scala spark 之后保留最近的行

Scala Spark groupBy/Agg 函数

Spark/Scala 1.6 如何使用 dataframe groupby agg 来实现以下逻辑?

Spark - 使用 groupBy 减少组合数量