Spark Dataframe/ Dataset:通用条件累积和

Posted

技术标签:

【中文标题】Spark Dataframe/ Dataset:通用条件累积和【英文标题】:Spark Dataframe/ Dataset: Generic Conditional cumulative sum 【发布时间】:2017-02-18 22:47:34 【问题描述】:

我有一个数据框,它有一些属性(C1 到 C2)、一个偏移量(以天为单位)和一些值(V1、V2)。

val inputDF= spark.sparkContext.parallelize(Seq((1,2,30, 100, -1),(1,2,30, 100, 0), (1,2,30, 100, 1),(11,21,30, 100, -1),(11,21,30, 100, 0), (11,21,30, 100, 1)), 10).toDF("c1", "c2", "v1", "v2", "offset")
inputDF: org.apache.spark.sql.DataFrame = [c1: int, c2: int ... 3 more fields]

scala> inputDF.show
+---+---+---+---+------+
| c1| c2| v1| v2|offset|
+---+---+---+---+------+
|  1|  2| 30|100|    -1|
|  1|  2| 30|100|     0|
|  1|  2| 30|100|     1|
| 11| 21| 30|100|    -1|
| 11| 21| 30|100|     0|
| 11| 21| 30|100|     1|
+---+---+---+---+------+

我需要做的是,计算 (c1,c2) 跨偏移量的 V1、V2 的累积和。

我试过了,但这与适用于任何数据框的通用解决方案相去甚远。

import org.apache.spark.sql.expressions.Window

val groupKey = List("c1", "c2").map(x => col(x.trim))
val orderByKey = List("offset").map(x => col(x.trim))

val w = Window.partitionBy(groupKey: _*).orderBy(orderByKey: _*)

val outputDF = inputDF
  .withColumn("cumulative_v1", sum(inputDF("v1")).over(w))
  .withColumn("cumulative_v2", sum(inputDF("v2")).over(w))

+---+---+---+---+------+----------------------------
| c1| c2| v1| v2|offset|cumulative_v1| cumulative_v2|
+---+---+---+---+------+-------------|--------------|
|  1|  2| 30|100|    -1|30           | 100          |
|  1|  2| 30|100|     0|60           | 200          |
|  1|  2| 30|100|     1|90           | 300          |
| 11| 21| 30|100|    -1|30           | 100          |
| 11| 21| 30|100|     0|60           | 200          |
| 11| 21| 30|100|     1|90           | 300          |
+---+---+---+---+------+-----------------------------

挑战是 [a] 我需要在多个不同的偏移窗口(-1 到 1)、(-10 到 10)、(-30 到 30)或任何其他偏移窗口中执行此操作 [b] 我需要使用这个函数跨多个数据帧/数据集,所以我希望有一个通用函数可以在 RDD/数据集中工作。

关于如何在 Spark 2.0 中实现这一点有什么想法吗?

非常感谢您的帮助。谢谢!

【问题讨论】:

欢迎来到 Stack Overflow!我们是一个问答网站,而不是一个雇佣编码员的服务。请解释到目前为止您已经尝试过什么以及为什么它没有奏效。见:Why is "Can someone help me?" not an actual question? 谢谢。我的解决方案达到了上述结果集。立即添加。 【参考方案1】:

这是一个仅使用数据帧的原始镜头。

import org.apache.spark.sql.expressions.Window

val groupKey = List("c1", "c2").map(x => col(x.trim))
val orderByKey = List("offset").map(x => col(x.trim))

val w = Window.partitionBy(groupKey: _*).orderBy(orderByKey: _*)

val inputDF= spark
  .sparkContext
  .parallelize(Seq((1,2,30, 100, -1),(1,2,3, 100, -2),(1,2,140, 100, 2),(1,2,30, 100, 0), (1,2,30, 100, 1),(11,21,30, 100, -1),(11,21,30, 100, 0), (11,21,30, 100, 1)), 10)
  .toDF("c1", "c2", "v1", "v2", "offset")

val outputDF = inputDF
  .withColumn("cumulative_v1", sum(when($"offset".between(-1, 1), inputDF("v1")).otherwise(0)).over(w))
  .withColumn("cumulative_v3", sum(when($"offset".between(-2, 2), inputDF("v1")).otherwise(0)).over(w))
  .withColumn("cumulative_v2", sum(inputDF("v2")).over(w))

这会为不同窗口生成单个“值”的累积总和。

scala> outputDF.show
+---+---+---+---+------+-------------+-------------+-------------+              
| c1| c2| v1| v2|offset|cumulative_v1|cumulative_v3|cumulative_v2|
+---+---+---+---+------+-------------+-------------+-------------+
|  1|  2|  3|100|    -2|            0|            0|          100|
|  1|  2| 30|100|    -1|           30|           30|          200|
|  1|  2| 30|100|     0|           60|           60|          300|
|  1|  2| 30|100|     1|           90|           90|          400|
|  1|  2|140|100|     2|           90|           90|          500|
| 11| 21| 30|100|    -1|           30|           30|          100|
| 11| 21| 30|100|     0|           60|           60|          200|
| 11| 21| 30|100|     1|           90|           90|          300|
+---+---+---+---+------+-------------+-------------+-------------+

这种方法的几个缺点 - [1] 对于每个条件窗口 (-1,1)、(-2,2) 或任何一个 (from_offset, to_offset),需要分别调用 sum()。 [2] 这不是通用函数。

我知道 spark 接受这样的聚合函数的可变列列表 -

val exprs = Map("v1" -> "sum", "v2" -> "sum")

但我不确定如何为具有可变条件的窗口函数扩展它。我仍然很想知道是否有更好的模块化/可重用函数我们可以编写来解决这个问题。

【讨论】:

【参考方案2】:

解决此问题的另一种通用方法是使用 foldLeft,如此处所述 - https://***.com/a/44532867/7059145

【讨论】:

以上是关于Spark Dataframe/ Dataset:通用条件累积和的主要内容,如果未能解决你的问题,请参考以下文章

Spark-SQL——DataFrame与Dataset

Spark——RDD和DataFrame和DataSet三者间的区别

`filter`/`where` 有条件地应用到 Spark `Dataset`/`Dataframe`

Spark中的DataFrame,Dataset和RDD之间的区别

Spark Dataframe/ Dataset:通用条件累积和

UDAF 合并 Spark DataSet/Dataframe 中第一个 orderdby 的行