熊猫 groupby.apply 到 pyspark

Posted

技术标签:

【中文标题】熊猫 groupby.apply 到 pyspark【英文标题】:pandas groupby.apply to pyspark 【发布时间】:2021-02-15 09:52:05 【问题描述】:

我有以下自定义函数在我的 pandas 数据框中进行聚合,我想在 pyspark 中做同样的事情:

def custom_aggregation_pyspark(x,queries):
    names=
    for k, v in regles_calcul.items():
        plus = x.query(v["plus_credit"])['OBNETCRE'].sum() + x.query(v["plus_debit"])['OBNETDEB'].sum()
        minus = x.query(v["minus_credit"])['OBNETCRE'].sum() + x.query(v["minus_debit"])['OBNETDEB'].sum()  

     

        names[k]= plus-minus
    return pd.Series(names, index=list(names.keys()))


df = df.groupby(['LBUDG']).apply(custom_aggregation_pandas, queries ).sum()

were queries 是查询的字典,如

'first_queries':    
    'plus_credit': "classe_compte_rg2 in ('237', '238')",
    'plus_debit': "classe_compte_rg2 in ('237', '238')", 
    'minus_credit': "classe_compte_rg2 in ('237', '238')", 
    'minus_debit': "classe_compte_rg1 in ('20', '21', '23')"
     

所以,我用 pyspark 'sql' 替换了 pandas “查询”

def custom_aggregation_pyspark(x,queries):
    x.createOrReplaceTempView("df")
    names=
    for k , v in queries.items():
        plus = spark.sql("SELECT * FROM df WHERE "+ v["plus_credit"]).select('OBNETCRE').groupby('OBNETCRE').sum().collect() + spark.sql("SELECT * FROM df WHERE "+ v["plus_debit"]).select('OBNETDEB').groupby('OBNETDEB').sum().collect() 
        minus= spark.sql("SELECT * FROM df WHERE "+ v["minus_credit"]).select('OBNETCRE').groupby('OBNETCRE').sum().collect() + spark.sql("SELECT * FROM df WHERE "+ v["minus_debit"]).select('OBNETDEB').groupby('OBNETDEB').sum().collect() 
        names[k]= plus-minus
    return pd.Series(names, index=list(names.keys()))

df.groupby("LBUDG").agg(custom_aggregation_pyspark(df,queries))

我肯定走错了方向,因为上面的代码不起作用,请你指导我应该看哪里?

所需的输出是按LBUDG(字符串)分组的表,其他列使用自定义聚合函数。

编辑数据框示例:

LBUDG OBNETCRE OBNETDEB classe_compte_rg0 classe_compte_rg1
LE POIZAT 0,00 0,00 1 10
LE POIZAT 67572,00 0,00 1 10
LE POIZAT 0,00 0,00 1 10
LE POIZAT 4908,12 0,00 1 10
LE POIZAT 0,00 0,00 1 10
DAFOUR 295240,67 0,00 1 10
LE POIZAT 0,00 0,00 1 11
LE POIZAT 0,00 0,00 1 12
LE POIZAT 0,00 0,00 1 13
LE POIZAT 0,00 0,00 1 13
LE POIZAT 53697,94 0,00 1 13

预期输出:

LBUDG AGG1 AGG2
LE POIZAT agg1_value for LE POIZAT ...
DAFOUR .... ...

其中 agg1 对应于(例如)OBNETCRE - OBNETDEB 的总和,其中classe_compte_rg1 具有值,是 10 或 11。

【问题讨论】:

【参考方案1】:

您可以使用epxr 来评估queries dict 中传递的条件,并使用条件聚合来计算总和。这是一个与您在 pandas 中给出的示例等效的示例:

from pyspark.sql import functions as F


def custom_aggregation_pyspark(df, regles_calcul):
    df1 = df.groupBy("LBUDG") \
        .agg(
        *[
            ((F.sum(F.when(F.expr(v["plus_credit"]), F.col("OBNETCRE")).otherwise(0)) +
              F.sum(F.when(F.expr(v["plus_debit"]), F.col("OBNETDEB")).otherwise(0))) -
             (F.sum(F.when(F.expr(v["minus_credit"]), F.col("OBNETCRE")).otherwise(0)) +
              F.sum(F.when(F.expr(v["minus_debit"]), F.col("OBNETDEB")).otherwise(0)))
             ).alias(k)

            for k, v in regles_calcul.items()
        ]
    )

    return df1


df = custom_aggregation_pyspark(df, queries)

【讨论】:

以上是关于熊猫 groupby.apply 到 pyspark的主要内容,如果未能解决你的问题,请参考以下文章

大熊猫分类变量的百分比计数

一次在多列上使用 pandas groupby().apply(list) [重复]

使用带有参数的 Pandas groupby() + apply()

groupby+(apply+agg+transform)方法的比较

pandas groupby apply 真的很慢

GroupBy,Apply用法笔记