pyspark中基于条件对多列进行分组的累积和函数
Posted
技术标签:
【中文标题】pyspark中基于条件对多列进行分组的累积和函数【英文标题】:cumulative sum function in pyspark grouping on multiple columns based on condition 【发布时间】:2017-09-16 00:41:58 【问题描述】:我需要创建一个 event_id,基本上是一个在多个列上分组的计数器(v_id,d_id,ip,l_id),并在 delta > 40 时递增它以获得 这样的输出
v_id d_id ip l_id delta event_id last_event_flag 1 20 30 40 1 1 牛 1 20 30 40 2 1 牛 1 20 30 40 3 1 牛 1 20 30 40 4 1 是 1 20 20 40 1 1 是 1 30 30 40 2 1 牛 1 30 30 40 3 1 牛 1 30 30 40 4 1 牛 1 30 30 40 5 1 是我能够使用 pandas 数据框实现这一目标
df['event_id'] = (df.delta >=40.0).groupby([df.l_id,df.v_id,d_id,ip]).cumsum() + 1
df.append(df['event_id'], ignore_index=True
但在较大数据上执行时出现内存错误。
如何在 pyspark 中做类似的事情。
【问题讨论】:
【参考方案1】:在 pyspark 中,您可以使用 window
函数:
首先让我们创建数据框。请注意,您也可以直接将其作为数据帧从 csv 加载:
df = spark.createDataFrame(
sc.parallelize(
[[1,20,30,40,1,1],
[1,20,30,40,2,1],
[1,20,30,40,3,1],
[1,20,30,40,4,1],
[1,20,30,40,45,2],
[1,20,30,40,1,2],
[1,30,30,40,2,1],
[1,30,30,40,3,1],
[1,30,30,40,4,1],
[1,30,30,40,5,1]]
),
["v_id","d_id","ip","l_id","delta","event_id"]
)
您的表中有一个隐式排序,我们需要创建一个单调递增的 id,这样我们就不会最终打乱它:
import pyspark.sql.functions as psf
df = df.withColumn(
"rn",
psf.monotonically_increasing_id()
)
+----+----+---+----+-----+--------+----------+
|v_id|d_id| ip|l_id|delta|event_id| rn|
+----+----+---+----+-----+--------+----------+
| 1| 20| 30| 40| 1| 1| 0|
| 1| 20| 30| 40| 2| 1| 1|
| 1| 20| 30| 40| 3| 1| 2|
| 1| 20| 30| 40| 4| 1| 3|
| 1| 20| 30| 40| 45| 2| 4|
| 1| 20| 30| 40| 1| 2|8589934592|
| 1| 30| 30| 40| 2| 1|8589934593|
| 1| 30| 30| 40| 3| 1|8589934594|
| 1| 30| 30| 40| 4| 1|8589934595|
| 1| 30| 30| 40| 5| 1|8589934596|
+----+----+---+----+-----+--------+----------+
现在计算event_id
和last_event_flag
:
from pyspark.sql import Window
w1 = Window.partitionBy("v_id", "d_id", "l_id", "ip").orderBy("rn")
w2 = Window.partitionBy("v_id", "d_id", "l_id", "ip").orderBy(psf.desc("rn"))
df.withColumn(
"event_id",
psf.sum((df.delta >= 40).cast("int")).over(w1) + 1
).withColumn(
"last_event_flag",
psf.row_number().over(w2) == 1
).drop("rn")
+----+----+---+----+-----+--------+---------------+
|v_id|d_id| ip|l_id|delta|event_id|last_event_flag|
+----+----+---+----+-----+--------+---------------+
| 1| 20| 30| 40| 1| 1| false|
| 1| 20| 30| 40| 2| 1| false|
| 1| 20| 30| 40| 3| 1| false|
| 1| 20| 30| 40| 4| 1| false|
| 1| 20| 30| 40| 45| 2| false|
| 1| 20| 30| 40| 1| 2| true|
| 1| 30| 30| 40| 2| 1| false|
| 1| 30| 30| 40| 3| 1| false|
| 1| 30| 30| 40| 4| 1| false|
| 1| 30| 30| 40| 5| 1| true|
+----+----+---+----+-----+--------+---------------+
【讨论】:
你好玛丽,谢谢你的回答,我会试试的。还需要再添加一个标志(last_event_flag)来标识该组中的最后一个事件。有什么建议吗? 您修改了样本数据,是故意的吗? 我在没有修改数据样本的情况下添加了last_event_flag
的计算。希望对您有所帮助,别忘了将您的问题标记为已解决【参考方案2】:
也许您应该在运行 groupby 之前计算 df = df[df.delta>=40] - 我不确定这是否重要。
您还可以查看 chunksize 以根据 csv 的块执行计算以提高内存效率。因此,您可以将数据分成 10000 行的块,然后运行计算以避免内存错误。
https://pandas.pydata.org/pandas-docs/stable/generated/pandas.read_csv.html
How to read a 6 GB csv file with pandas
【讨论】:
以上是关于pyspark中基于条件对多列进行分组的累积和函数的主要内容,如果未能解决你的问题,请参考以下文章