SparkSQL在分组后从数据帧中获取之前和之后的行

Posted

技术标签:

【中文标题】SparkSQL在分组后从数据帧中获取之前和之后的行【英文标题】:SparkSQL Fetch rows before and after from dataframe after grouping 【发布时间】:2016-01-21 01:33:51 【问题描述】:

给定这个 Dataframe df

 +-----------+--------------------+-------------+-------+
|CustNumb   |        PurchaseDate|     price| activeFlag|
+-----------+--------------------+-------------+-------+
|          3|2013-07-17 00:00:...|         17.9|    0|
|          3|2013-08-27 00:00:...|        61.13|    0|
|          3|2013-08-28 00:00:...|        25.07|    1|
|          3|2013-08-29 00:00:...|        24.23|    0|
|          3|2013-09-06 00:00:...|         3.94|    0|
|         20|2013-02-28 00:00:...|       354.64|    0|
|         20|2013-04-07 00:00:...|         15.0|    0|
|         20|2013-05-10 00:00:...|        545.0|    0|
|         28|2013-02-17 00:00:...|        190.0|    0|
|         28|2013-04-08 00:00:...|         20.0|    0|
|         28|2013-04-16 00:00:...|         89.0|    0|
|         28|2013-05-18 00:00:...|        260.0|    0|
|         28|2013-06-06 00:00:...|       586.57|    1|
|         28|2013-06-09 00:00:...|        250.0|    0|

当它发现一个非活动标志“1”时,我想得到一个结果,它返回按购买日期订购前后两行的平均价格。这是我正在寻找的结果:

+-----------+--------------------+-------------+-------+---------------+
|CustNumb   |        PurchaseDate|     price| activeFlag| OutputVal |
+-----------+--------------------+-------------+-------+------------+
|          3|2013-07-17 00:00:...|         17.9|    0|   17.9
|          3|2013-08-27 00:00:...|        61.13|    0|   61.13
|          3|2013-08-28 00:00:...|        25.07|    1|   26.8 (avg of 2 prices before and 2 after)
|          3|2013-08-29 00:00:...|        24.23|    0|   24.23
|          3|2013-09-06 00:00:...|         3.94|    0|   3.94

|         20|2013-02-28 00:00:...|       354.64|    0|   354.64
|         20|2013-04-07 00:00:...|         15.0|    0|   15.0
|         20|2013-05-10 00:00:...|        545.0|    0|   545.0

|         28|2013-02-17 00:00:...|        190.0|    0|   190.0
|         28|2013-04-08 00:00:...|         20.0|    0|   20.0
|         28|2013-04-16 00:00:...|         89.0|    0|   89.0
|         28|2013-05-18 00:00:...|        260.0|    0|   260.0
|         28|2013-06-06 00:00:...|       586.57|    1|   199.6 (avg of 2 prices before and 1 after)
|         28|2013-06-09 00:00:...|        250.0|    0|   250

在上面的 custNum 3 和 28 示例中,我有 activeFlag 1,所以如果存在相同的 custNumb,我需要计算前后 2 行的平均值。

我正在考虑在数据帧上使用窗口函数,但由于我对 spark 编程还很陌生,所以无法在 spark 中解决这个问题

val w = Window.partitionBy("CustNumb").orderBy("PurchaseDate")

我怎样才能做到这一点,是否可以通过 Window 函数或任何更好的方法来实现?

【问题讨论】:

【参考方案1】:

如果您已经有窗口,像这样的简单条件应该可以正常工作:

val cond = ($"activeFlag" === 1) && (lag($"activeFlag", 1).over(w) === 0)

// Windows covering rows before and after
val before = w.rowsBetween(-2, -1)
val after = w.rowsBetween(1, 2)

// Expression with sum of rows and number of rows 
val sumPrice = sum($"price").over(before) + sum($"price").over(after)
val countPrice = sum($"ones_").over(before) + sum($"ones_").over(after)

val expr = when(cond, sumPrice / countPrice).otherwise($"price")

df.withColumn("ones_", lit(1)).withColumn("outputVal", expr)

【讨论】:

Zero323。感谢您的解决方案,但您能解释一下查找 avgprice 的滞后功能吗? lag($"price,1).over(w) 返回什么以及它如何在之前和之后花费 2 行价格?我需要验证在 activeflag 行上方和下方的 2 行是否存在相同的 custNum,然后获取平均值 其实没有。不知怎的,我误读了你的描述。请检查更新。 感谢您的解决方案!我修改了您的旧解决方案并使其正常工作。我将其发布在下面,如果需要修复,请告诉我! 顺便问一下是什么_?? 只有一列,所以我们知道我们取了多少个值。否则我们可能只取 3 但除以 4。【参考方案2】:

感谢 Zero323。你摇滚! 这是我根据您的帮助修改的代码片段,以获取我在结果中寻找的数据:

 val windw = Window.partitionBy("CustNumb").orderBy("PurchaseDate")
 val cond = ($"activeFlag" === 1) //&& (lag($"activeFlag", 1).over(win) === 0)
 val avgprice = (lag($"price", 1).over(windw)  + lag($"price", 2).over(windw) + lead($"price", 1).over(windw)  + lead($"price", 2).over(windw)) / 4.0
 val expr = when(cond, avgprice).otherwise($"price")
 val finalresult = df.withColumn("newPrice", expr)

我唯一要弄清楚的是,如果 activeflag = 1 存在于上面的行中,那么我想在 activeflag=1 的行上方多走一行。如果我找到解决方法来解决这个问题,我会尝试更新。

【讨论】:

看起来几乎是正确的,但它没有涵盖更短窗口的可能性(例如只有 1 个以下)。我一开始也错过了。最简单的方法是使用avg($"price").over(w.rowsBetween(-2, 2)),但它会考虑当前行。

以上是关于SparkSQL在分组后从数据帧中获取之前和之后的行的主要内容,如果未能解决你的问题,请参考以下文章

在R中具有相同组ID的另一个数据帧中按值过滤分组数据帧

如何从两个合并的数据帧中选择完成之前和之后的特定时间间隔?

应用 groupby 后从组中获取特定元素-PANDAS [重复]

基于分组数据帧中的两个条件的子集

在方面适合后从 UIImageView 获取图像的宽度和高度

按值(不是列)分组后从组中选择一个随机条目?