我需要创建一个 pyspark UDF,它通过比较从查询中输出一个表

Posted

技术标签:

【中文标题】我需要创建一个 pyspark UDF,它通过比较从查询中输出一个表【英文标题】:I need to create a pyspark UDF that outputs a table from a query with a comparison 【发布时间】:2021-07-13 08:07:46 【问题描述】:

我正在使用 Kaggle 上的 IBM 损耗数据集。我想要做的是将分类变量的出现次数计数到Attrition == 'Yes'Attrition == 'No',并采用简单的比率来查看哪个级别的分类变量更有可能减少。现在我可以在 Pandas 中执行此操作,如下所示:

def cal_ratio(x):
    n_1 = sum(x['Attrition'].values == 'Yes')
    n_0 = sum(x['Attrition'].values == 'No')
    return n_1/n_0

或者我可以很容易地编写一个 spark.sql 查询来完成它,然后为我想要比较的每个分类变量重新编写它。像 Pandas 这样的功能会让我的生活更轻松,但我找不到任何关于如何创建这种 UDF 或如何注册它的真正指导。

编辑:如果我还问这在 pyspark 中如何使用 UDF 可能会有所帮助?

b = data.groupby('BusinessTravel').apply(cal_ratio)

【问题讨论】:

【参考方案1】:

不确定这是最好的解决方案,但你可以试试这个:

# My sample dataframe
df.show()
+---------+                                                                     
|Attrition|
+---------+
|      Yes|
|      Yes|
|      Yes|
|      Yes|
|      Yes|
|       No|
|       No|
+---------+
from pyspark.sql import functions as F

result = (
    df.agg(
        F.sum(F.when(F.col("Attrition") == "Yes", 1)).alias("Yes"),
        F.sum(F.when(F.col("Attrition") == "No", 1)).alias("No"),
    )
    .select((F.col("Yes") / F.col("No")).alias("ratio"))
    .first()
)

print(result.ratio)
> 2.5

当然,您可以通过将硬编码值替换为变量来将 result 事物转换为函数。

def cal_ratio(df):
    result = (
        df.agg(
            F.sum(F.when(F.col("Attrition") == "Yes", 1)).alias("Yes"),
            F.sum(F.when(F.col("Attrition") == "No", 1)).alias("No"),
        )
        .select((F.col("Yes") / F.col("No")).alias("ratio"))
        .first()
    )
    return result.ratio

编辑:如果您需要按列分组,则需要将first 替换为collect

def cal_ratio(df):
    result = (
        df.groupBy("BusinessTravel")
        .agg(
            F.sum(F.when(F.col("Attrition") == "Yes", 1)).alias("Yes"),
            F.sum(F.when(F.col("Attrition") == "No", 1)).alias("No"),
        )
        .select((F.col("Yes") / F.col("No")).alias("ratio"))
        .collect()
    )
    return result

【讨论】:

以上是关于我需要创建一个 pyspark UDF,它通过比较从查询中输出一个表的主要内容,如果未能解决你的问题,请参考以下文章

pyspark 的用户定义函数 (UDF) 是不是需要单元测试?

pyspark udf 返回值

PySpark / 计算出现次数并使用 UDF 创建新列

无法序列化 PySpark UDF

通过 pyspark 中的 UDF 读取文本文件返回意外输出

在 UDF Pyspark 中更新变量值