SparkSQL在分组后从数据帧中获取之前和之后的行
Posted
技术标签:
【中文标题】SparkSQL在分组后从数据帧中获取之前和之后的行【英文标题】:SparkSQL Fetch rows before and after from dataframe after grouping 【发布时间】:2016-01-21 01:33:51 【问题描述】:给定这个 Dataframe df
+-----------+--------------------+-------------+-------+
|CustNumb | PurchaseDate| price| activeFlag|
+-----------+--------------------+-------------+-------+
| 3|2013-07-17 00:00:...| 17.9| 0|
| 3|2013-08-27 00:00:...| 61.13| 0|
| 3|2013-08-28 00:00:...| 25.07| 1|
| 3|2013-08-29 00:00:...| 24.23| 0|
| 3|2013-09-06 00:00:...| 3.94| 0|
| 20|2013-02-28 00:00:...| 354.64| 0|
| 20|2013-04-07 00:00:...| 15.0| 0|
| 20|2013-05-10 00:00:...| 545.0| 0|
| 28|2013-02-17 00:00:...| 190.0| 0|
| 28|2013-04-08 00:00:...| 20.0| 0|
| 28|2013-04-16 00:00:...| 89.0| 0|
| 28|2013-05-18 00:00:...| 260.0| 0|
| 28|2013-06-06 00:00:...| 586.57| 1|
| 28|2013-06-09 00:00:...| 250.0| 0|
当它发现一个非活动标志“1”时,我想得到一个结果,它返回按购买日期订购前后两行的平均价格。这是我正在寻找的结果:
+-----------+--------------------+-------------+-------+---------------+
|CustNumb | PurchaseDate| price| activeFlag| OutputVal |
+-----------+--------------------+-------------+-------+------------+
| 3|2013-07-17 00:00:...| 17.9| 0| 17.9
| 3|2013-08-27 00:00:...| 61.13| 0| 61.13
| 3|2013-08-28 00:00:...| 25.07| 1| 26.8 (avg of 2 prices before and 2 after)
| 3|2013-08-29 00:00:...| 24.23| 0| 24.23
| 3|2013-09-06 00:00:...| 3.94| 0| 3.94
| 20|2013-02-28 00:00:...| 354.64| 0| 354.64
| 20|2013-04-07 00:00:...| 15.0| 0| 15.0
| 20|2013-05-10 00:00:...| 545.0| 0| 545.0
| 28|2013-02-17 00:00:...| 190.0| 0| 190.0
| 28|2013-04-08 00:00:...| 20.0| 0| 20.0
| 28|2013-04-16 00:00:...| 89.0| 0| 89.0
| 28|2013-05-18 00:00:...| 260.0| 0| 260.0
| 28|2013-06-06 00:00:...| 586.57| 1| 199.6 (avg of 2 prices before and 1 after)
| 28|2013-06-09 00:00:...| 250.0| 0| 250
在上面的 custNum 3 和 28 示例中,我有 activeFlag 1,所以如果存在相同的 custNumb,我需要计算前后 2 行的平均值。
我正在考虑在数据帧上使用窗口函数,但由于我对 spark 编程还很陌生,所以无法在 spark 中解决这个问题
val w = Window.partitionBy("CustNumb").orderBy("PurchaseDate")
我怎样才能做到这一点,是否可以通过 Window 函数或任何更好的方法来实现?
【问题讨论】:
【参考方案1】:如果您已经有窗口,像这样的简单条件应该可以正常工作:
val cond = ($"activeFlag" === 1) && (lag($"activeFlag", 1).over(w) === 0)
// Windows covering rows before and after
val before = w.rowsBetween(-2, -1)
val after = w.rowsBetween(1, 2)
// Expression with sum of rows and number of rows
val sumPrice = sum($"price").over(before) + sum($"price").over(after)
val countPrice = sum($"ones_").over(before) + sum($"ones_").over(after)
val expr = when(cond, sumPrice / countPrice).otherwise($"price")
df.withColumn("ones_", lit(1)).withColumn("outputVal", expr)
【讨论】:
Zero323。感谢您的解决方案,但您能解释一下查找 avgprice 的滞后功能吗? lag($"price,1).over(w) 返回什么以及它如何在之前和之后花费 2 行价格?我需要验证在 activeflag 行上方和下方的 2 行是否存在相同的 custNum,然后获取平均值 其实没有。不知怎的,我误读了你的描述。请检查更新。 感谢您的解决方案!我修改了您的旧解决方案并使其正常工作。我将其发布在下面,如果需要修复,请告诉我! 顺便问一下是什么_?? 只有一列,所以我们知道我们取了多少个值。否则我们可能只取 3 但除以 4。【参考方案2】:感谢 Zero323。你摇滚! 这是我根据您的帮助修改的代码片段,以获取我在结果中寻找的数据:
val windw = Window.partitionBy("CustNumb").orderBy("PurchaseDate")
val cond = ($"activeFlag" === 1) //&& (lag($"activeFlag", 1).over(win) === 0)
val avgprice = (lag($"price", 1).over(windw) + lag($"price", 2).over(windw) + lead($"price", 1).over(windw) + lead($"price", 2).over(windw)) / 4.0
val expr = when(cond, avgprice).otherwise($"price")
val finalresult = df.withColumn("newPrice", expr)
我唯一要弄清楚的是,如果 activeflag = 1 存在于上面的行中,那么我想在 activeflag=1 的行上方多走一行。如果我找到解决方法来解决这个问题,我会尝试更新。
【讨论】:
看起来几乎是正确的,但它没有涵盖更短窗口的可能性(例如只有 1 个以下)。我一开始也错过了。最简单的方法是使用avg($"price").over(w.rowsBetween(-2, 2))
,但它会考虑当前行。以上是关于SparkSQL在分组后从数据帧中获取之前和之后的行的主要内容,如果未能解决你的问题,请参考以下文章
应用 groupby 后从组中获取特定元素-PANDAS [重复]