pyspark中基于条件对多列进行分组的累积和函数

Posted

技术标签:

【中文标题】pyspark中基于条件对多列进行分组的累积和函数【英文标题】:cumulative sum function in pyspark grouping on multiple columns based on condition 【发布时间】:2017-09-16 00:41:58 【问题描述】:

我需要创建一个 event_id,基本上是一个在多个列上分组的计数器(v_id,d_id,ip,l_id),并在 delta > 40 时递增它以获得 这样的输出

v_id d_id ip l_id delta event_id last_event_flag 1 20 30 40 1 1 牛 1 20 30 40 2 1 牛 1 20 30 40 3 1 牛 1 20 30 40 4 1 是 1 20 20 40 1 1 是 1 30 30 40 2 1 牛 1 30 30 40 3 1 牛 1 30 30 40 4 1 牛 1 30 30 40 5 1 是

我能够使用 pandas 数据框实现这一目标

df['event_id'] = (df.delta >=40.0).groupby([df.l_id,df.v_id,d_id,ip]).cumsum() + 1
df.append(df['event_id'], ignore_index=True

但在较大数据上执行时出现内存错误。

如何在 pyspark 中做类似的事情。

【问题讨论】:

【参考方案1】:

在 pyspark 中,您可以使用 window 函数:

首先让我们创建数据框。请注意,您也可以直接将其作为数据帧从 csv 加载:

df = spark.createDataFrame(
    sc.parallelize(
        [[1,20,30,40,1,1],
        [1,20,30,40,2,1],
        [1,20,30,40,3,1],
        [1,20,30,40,4,1],
        [1,20,30,40,45,2],
        [1,20,30,40,1,2],
        [1,30,30,40,2,1],
        [1,30,30,40,3,1],
        [1,30,30,40,4,1],
        [1,30,30,40,5,1]]
    ), 
    ["v_id","d_id","ip","l_id","delta","event_id"]
)

您的表中有一个隐式排序,我们需要创建一个单调递增的 id,这样我们就不会最终打乱它:

import pyspark.sql.functions as psf
df = df.withColumn(
    "rn", 
    psf.monotonically_increasing_id()
)
    +----+----+---+----+-----+--------+----------+
    |v_id|d_id| ip|l_id|delta|event_id|        rn|
    +----+----+---+----+-----+--------+----------+
    |   1|  20| 30|  40|    1|       1|         0|
    |   1|  20| 30|  40|    2|       1|         1|
    |   1|  20| 30|  40|    3|       1|         2|
    |   1|  20| 30|  40|    4|       1|         3|
    |   1|  20| 30|  40|   45|       2|         4|
    |   1|  20| 30|  40|    1|       2|8589934592|
    |   1|  30| 30|  40|    2|       1|8589934593|
    |   1|  30| 30|  40|    3|       1|8589934594|
    |   1|  30| 30|  40|    4|       1|8589934595|
    |   1|  30| 30|  40|    5|       1|8589934596|
    +----+----+---+----+-----+--------+----------+

现在计算event_idlast_event_flag

from pyspark.sql import Window
w1 = Window.partitionBy("v_id", "d_id", "l_id", "ip").orderBy("rn")
w2 = Window.partitionBy("v_id", "d_id", "l_id", "ip").orderBy(psf.desc("rn"))
df.withColumn(
    "event_id", 
    psf.sum((df.delta >= 40).cast("int")).over(w1) + 1
).withColumn(
    "last_event_flag", 
    psf.row_number().over(w2) == 1
).drop("rn")

    +----+----+---+----+-----+--------+---------------+
    |v_id|d_id| ip|l_id|delta|event_id|last_event_flag|
    +----+----+---+----+-----+--------+---------------+
    |   1|  20| 30|  40|    1|       1|          false|
    |   1|  20| 30|  40|    2|       1|          false|
    |   1|  20| 30|  40|    3|       1|          false|
    |   1|  20| 30|  40|    4|       1|          false|
    |   1|  20| 30|  40|   45|       2|          false|
    |   1|  20| 30|  40|    1|       2|           true|
    |   1|  30| 30|  40|    2|       1|          false|
    |   1|  30| 30|  40|    3|       1|          false|
    |   1|  30| 30|  40|    4|       1|          false|
    |   1|  30| 30|  40|    5|       1|           true|
    +----+----+---+----+-----+--------+---------------+

【讨论】:

你好玛丽,谢谢你的回答,我会试试的。还需要再添加一个标志(last_event_flag)来标识该组中的最后一个事件。有什么建议吗? 您修改了样本数据,是故意的吗? 我在没有修改数据样本的情况下添加了last_event_flag的计算。希望对您有所帮助,别忘了将您的问题标记为已解决【参考方案2】:

也许您应该在运行 groupby 之前计算 df = df[df.delta>=40] - 我不确定这是否重要。

您还可以查看 chunksize 以根据 csv 的块执行计算以提高内存效率。因此,您可以将数据分成 10000 行的块,然后运行计算以避免内存错误。

https://pandas.pydata.org/pandas-docs/stable/generated/pandas.read_csv.html

How to read a 6 GB csv file with pandas

【讨论】:

以上是关于pyspark中基于条件对多列进行分组的累积和函数的主要内容,如果未能解决你的问题,请参考以下文章

基于多列中的直接和间接相似性对变量进行分组的快速方法

如何在pyspark数据框中找到没有分组的累积频率

PySpark 函数基于多列数据框创建自定义输出

在 pyspark 中对大量列进行累积求和的优化方法

Pyspark 有条件的累积和

Pyspark - 具有重置条件的累积和