scala spark减少groupby中的列表
Posted
技术标签:
【中文标题】scala spark减少groupby中的列表【英文标题】:scala spark reduce list in groupby 【发布时间】:2021-12-02 22:49:27 【问题描述】:我有两列的 spark DataFrame
colA colB
1 3
1 2
2 4
2 5
2 1
我想 groupBy colA 并为每个组迭代 colB 列表,这样:
res = 0
for i in collect_list(col("colB")):
res += i * (3+res)
返回值为res
所以我明白了:
colA colB
1 24
2 78
如何在 scala 中做到这一点?
【问题讨论】:
使用reduceByKey
感谢您为我指明方向。很乐意为我提供一个最低限度的工作示例,例如列表(8,4)?谢谢
【参考方案1】:
您可以通过以下方式达到您想要的结果:
val df = Seq((1,3), (1,2), (2,4), (2,5), (2,1)).toDF("colA", "colB")
val retDf = df
.groupBy("colA")
.agg(
aggregate(
collect_list("colB"), lit(0), (acc, nxt) => nxt * (acc + 3)
) as "colB")
但是,您需要非常小心,因为 Spark 上的数据是分布式的。如果数据在读入 Spark 后被打乱,则不能保证它会以相同的顺序被收集。在玩具示例中,collect_list("colB")
将返回Seq(3,2)
,其中colA
是1
。但是,如果在早期阶段有任何洗牌,collect_list
也可以返回Seq(2,3)
,这将为您提供27
,而不是所需的24
。您需要为您的数据提供一些元数据,您可以使用这些元数据来确保按照您期望的顺序处理这些数据,例如使用 monotonicallyIncreasingId
方法。
【讨论】:
您自己承认答案不正确。 怎么样?它解决了提出的问题。我不知道使用该解决方案的上下文,因此我提供了必要的警告,并指出了如何解决该问题。 查看您的预订...不确定...【参考方案2】:不会丢失排序的 RDD 方法。
%scala
val rdd1 = spark.sparkContext.parallelize(Seq((1,3), (1,3), (2,4), (2,5), (2,1))).zipWithIndex().map(x => ((x._1._1), (x._1._2, x._2)) )
val rdd2 = rdd1.groupByKey
// Convert to Array.
val rdd3 = rdd2.map(x => (x._1, x._2.toArray))
val rdd4 = rdd3.map(x => (x._1, x._2.sortBy(_._2)))
val rdd5 = rdd4.mapValues(v => v.map(_._1))
rdd5.collect()
val res = rdd5.map(x => (x._1, x._2.fold(0)((acc, nxt) => nxt * (acc + 3) )))
res.collect()
返回:
res201: Array[(Int, Int)] = Array((1,24), (2,78))
根据需要从 DF 转换到 DF。
【讨论】:
有点奇怪。 我想知道公式是否正确,我认为第一个值应该是 33 由于 res += 需要 + acc 添加。 foldLeft 也是可能的。以上是关于scala spark减少groupby中的列表的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Spark Scala 中将 WrappedArray 转换为 List?
在 groupBy scala spark 之后保留最近的行