在同一分组中添加具有上述所有行总和的列
Posted
技术标签:
【中文标题】在同一分组中添加具有上述所有行总和的列【英文标题】:Adding column with sum of all rows above in same grouping 【发布时间】:2019-07-25 19:35:49 【问题描述】:我需要创建一个“滚动计数”列,该列采用以前的计数并为每一天和公司添加新计数。我已经将数据框组织和排序为每个公司的升序日期组,并具有相应的计数。我还添加了一个索引每个分组的“ix”列,如下所示:
+--------------------+--------------------+-----+---+
| Normalized_Date| company|count| ix|
+--------------------+--------------------+-----+---+
|09/25/2018 00:00:...|[5c40c8510fb7c017...| 7| 1|
|09/25/2018 00:00:...|[5bdb2b543951bf07...| 9| 1|
|11/28/2017 00:00:...|[593b0d9f3f21f9dd...| 7| 1|
|11/29/2017 00:00:...|[593b0d9f3f21f9dd...| 60| 2|
|01/09/2018 00:00:...|[593b0d9f3f21f9dd...| 1| 3|
|04/27/2018 00:00:...|[593b0d9f3f21f9dd...| 9| 4|
|09/25/2018 00:00:...|[593b0d9f3f21f9dd...| 29| 5|
|11/20/2018 00:00:...|[593b0d9f3f21f9dd...| 42| 6|
|12/11/2018 00:00:...|[593b0d9f3f21f9dd...| 317| 7|
|01/04/2019 00:00:...|[593b0d9f3f21f9dd...| 3| 8|
|02/13/2019 00:00:...|[593b0d9f3f21f9dd...| 15| 9|
|04/01/2019 00:00:...|[593b0d9f3f21f9dd...| 1| 10|
+--------------------+--------------------+-----+---+
我需要的输出只是将每个公司截至该日期的所有计数相加。像这样:
+--------------------+--------------------+-----+---+------------+
| Normalized_Date| company|count| ix|RollingCount|
+--------------------+--------------------+-----+---+------------+
|09/25/2018 00:00:...|[5c40c8510fb7c017...| 7| 1| 7|
|09/25/2018 00:00:...|[5bdb2b543951bf07...| 9| 1| 9|
|11/28/2017 00:00:...|[593b0d9f3f21f9dd...| 7| 1| 7|
|11/29/2017 00:00:...|[593b0d9f3f21f9dd...| 60| 2| 67|
|01/09/2018 00:00:...|[593b0d9f3f21f9dd...| 1| 3| 68|
|04/27/2018 00:00:...|[593b0d9f3f21f9dd...| 9| 4| 77|
|09/25/2018 00:00:...|[593b0d9f3f21f9dd...| 29| 5| 106|
|11/20/2018 00:00:...|[593b0d9f3f21f9dd...| 42| 6| 148|
|12/11/2018 00:00:...|[593b0d9f3f21f9dd...| 317| 7| 465|
|01/04/2019 00:00:...|[593b0d9f3f21f9dd...| 3| 8| 468|
|02/13/2019 00:00:...|[593b0d9f3f21f9dd...| 15| 9| 483|
|04/01/2019 00:00:...|[593b0d9f3f21f9dd...| 1| 10| 484|
+--------------------+--------------------+-----+---+------------+
我认为 lag 函数会很有用,并且我能够使用 ix > 1 获取每一行 rollingcount,并使用以下代码直接在其上方添加计数:
w = Window.partitionBy('company').orderBy(F.unix_timestamp('Normalized_Dat e','MM/dd/yyyy HH:mm:ss aaa').cast('timestamp'))
refined_DF = solutionDF.withColumn("rn", F.row_number().over(w))
solutionDF = refined_DF.withColumn('RollingCount',F.when(refined_DF['rn'] > 1, refined_DF['count'] + F.lag(refined_DF['count'],count= 1 ).over(w)).otherwise(refined_DF['count']))
产生以下df:
+--------------------+--------------------+-----+---+------------+
| Normalized_Date| company|count| ix|RollingCount|
+--------------------+--------------------+-----+---+------------+
|09/25/2018 00:00:...|[5c40c8510fb7c017...| 7| 1| 7|
|09/25/2018 00:00:...|[5bdb2b543951bf07...| 9| 1| 9|
|11/28/2017 00:00:...|[593b0d9f3f21f9dd...| 7| 1| 7|
|11/29/2017 00:00:...|[593b0d9f3f21f9dd...| 60| 2| 67|
|01/09/2018 00:00:...|[593b0d9f3f21f9dd...| 1| 3| 61|
|04/27/2018 00:00:...|[593b0d9f3f21f9dd...| 9| 4| 10|
|09/25/2018 00:00:...|[593b0d9f3f21f9dd...| 29| 5| 38|
|11/20/2018 00:00:...|[593b0d9f3f21f9dd...| 42| 6| 71|
|12/11/2018 00:00:...|[593b0d9f3f21f9dd...| 317| 7| 359|
|01/04/2019 00:00:...|[593b0d9f3f21f9dd...| 3| 8| 320|
|02/13/2019 00:00:...|[593b0d9f3f21f9dd...| 15| 9| 18|
|04/01/2019 00:00:...|[593b0d9f3f21f9dd...| 1| 10| 16|
+--------------------+--------------------+-----+---+------------+
我只需要它对上面 ix 行的所有计数求和。我尝试使用 udf 来计算滞后函数的“计数”输入,但我不断收到“'列'对象不可调用”错误,而且它不会对所有行求和。我也尝试过使用循环,但这似乎是不可能的,因为它每次都会创建一个新的数据框,而且我需要在之后加入它们。必须有一种更简单、更简单的方法来做到这一点。也许与滞后不同的功能?
【问题讨论】:
你能写出你的预期输出吗?在这里更容易理解你想做什么。谢谢。 @NeilZ 我的预期输出是第二个数据帧 【参考方案1】:lag 在当前值之前返回某一行,但您需要一个范围来计算累积和。因此你必须使用窗口函数rangeBetween (rowsBetween)。看看下面的例子:
import pyspark.sql.functions as F
from pyspark.sql import Window
l = [
('09/25/2018', '5c40c8510fb7c017', 7, 1),
('09/25/2018', '5bdb2b543951bf07', 9, 1),
('11/28/2017', '593b0d9f3f21f9dd', 7, 1),
('11/29/2017', '593b0d9f3f21f9dd', 60, 2),
('01/09/2018', '593b0d9f3f21f9dd', 1, 3),
('04/27/2018', '593b0d9f3f21f9dd', 9, 4),
('09/25/2018', '593b0d9f3f21f9dd', 29, 5),
('11/20/2018', '593b0d9f3f21f9dd', 42, 6),
('12/11/2018', '593b0d9f3f21f9dd', 317, 7),
('01/04/2019', '593b0d9f3f21f9dd', 3, 8),
('02/13/2019', '593b0d9f3f21f9dd', 15, 9),
('04/01/2019', '593b0d9f3f21f9dd', 1, 10)
]
columns = ['Normalized_Date', 'company','count', 'ix']
df=spark.createDataFrame(l, columns)
df = df.withColumn('Normalized_Date', F.to_date(df.Normalized_Date, 'MM/dd/yyyy'))
w = Window.partitionBy('company').orderBy('Normalized_Date').rangeBetween(Window.unboundedPreceding, 0)
df = df.withColumn('Rolling_count', F.sum('count').over(w))
df.show()
输出:
+---------------+----------------+-----+---+-------------+
|Normalized_Date| company|count| ix|Rolling_count|
+---------------+----------------+-----+---+-------------+
| 2018-09-25|5c40c8510fb7c017| 7| 1| 7|
| 2018-09-25|5bdb2b543951bf07| 9| 1| 9|
| 2017-11-28|593b0d9f3f21f9dd| 7| 1| 7|
| 2017-11-29|593b0d9f3f21f9dd| 60| 2| 67|
| 2018-01-09|593b0d9f3f21f9dd| 1| 3| 68|
| 2018-04-27|593b0d9f3f21f9dd| 9| 4| 77|
| 2018-09-25|593b0d9f3f21f9dd| 29| 5| 106|
| 2018-11-20|593b0d9f3f21f9dd| 42| 6| 148|
| 2018-12-11|593b0d9f3f21f9dd| 317| 7| 465|
| 2019-01-04|593b0d9f3f21f9dd| 3| 8| 468|
| 2019-02-13|593b0d9f3f21f9dd| 15| 9| 483|
| 2019-04-01|593b0d9f3f21f9dd| 1| 10| 484|
+---------------+----------------+-----+---+-------------+
【讨论】:
【参考方案2】:试试这个。 您需要将所有先前行的总和与窗口框架中的当前行相加。
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.expressions.WindowSpec
import org.apache.spark.sql.functions._
val df = Seq(
("5c40c8510fb7c017", 7, 1),
("5bdb2b543951bf07", 9, 1),
("593b0d9f3f21f9dd", 7, 1),
("593b0d9f3f21f9dd", 60, 2),
("593b0d9f3f21f9dd", 1, 3),
("593b0d9f3f21f9dd", 9, 4),
("593b0d9f3f21f9dd", 29, 5),
("593b0d9f3f21f9dd", 42, 6),
("593b0d9f3f21f9dd", 317, 7),
("593b0d9f3f21f9dd", 3, 8),
("593b0d9f3f21f9dd", 15, 9),
("593b0d9f3f21f9dd", 1, 10)
).toDF("company", "count", "ix")
scala> df.show(false)
+----------------+-----+---+
|company |count|ix |
+----------------+-----+---+
|5c40c8510fb7c017|7 |1 |
|5bdb2b543951bf07|9 |1 |
|593b0d9f3f21f9dd|7 |1 |
|593b0d9f3f21f9dd|60 |2 |
|593b0d9f3f21f9dd|1 |3 |
|593b0d9f3f21f9dd|9 |4 |
|593b0d9f3f21f9dd|29 |5 |
|593b0d9f3f21f9dd|42 |6 |
|593b0d9f3f21f9dd|317 |7 |
|593b0d9f3f21f9dd|3 |8 |
|593b0d9f3f21f9dd|15 |9 |
|593b0d9f3f21f9dd|1 |10 |
+----------------+-----+---+
scala> val overColumns = Window.partitionBy("company").orderBy("ix").rowsBetween(Window.unboundedPreceding, Window.currentRow)
overColumns: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@3ed5e17c
scala> val outputDF = df.withColumn("RollingCount", sum("count").over(overColumns))
outputDF: org.apache.spark.sql.DataFrame = [company: string, count: int ... 2 more fields]
scala> outputDF.show(false)
+----------------+-----+---+------------+
|company |count|ix |RollingCount|
+----------------+-----+---+------------+
|5c40c8510fb7c017|7 |1 |7 |
|5bdb2b543951bf07|9 |1 |9 |
|593b0d9f3f21f9dd|7 |1 |7 |
|593b0d9f3f21f9dd|60 |2 |67 |
|593b0d9f3f21f9dd|1 |3 |68 |
|593b0d9f3f21f9dd|9 |4 |77 |
|593b0d9f3f21f9dd|29 |5 |106 |
|593b0d9f3f21f9dd|42 |6 |148 |
|593b0d9f3f21f9dd|317 |7 |465 |
|593b0d9f3f21f9dd|3 |8 |468 |
|593b0d9f3f21f9dd|15 |9 |483 |
|593b0d9f3f21f9dd|1 |10 |484 |
+----------------+-----+---+------------+
【讨论】:
以上是关于在同一分组中添加具有上述所有行总和的列的主要内容,如果未能解决你的问题,请参考以下文章
对最后一小时分组中的列值求和,然后将所有 5 的总和作为另一列中的总和