我需要创建一个 pyspark UDF,它通过比较从查询中输出一个表
Posted
技术标签:
【中文标题】我需要创建一个 pyspark UDF,它通过比较从查询中输出一个表【英文标题】:I need to create a pyspark UDF that outputs a table from a query with a comparison 【发布时间】:2021-07-13 08:07:46 【问题描述】:我正在使用 Kaggle 上的 IBM 损耗数据集。我想要做的是将分类变量的出现次数计数到Attrition == 'Yes'
和Attrition == 'No'
,并采用简单的比率来查看哪个级别的分类变量更有可能减少。现在我可以在 Pandas 中执行此操作,如下所示:
def cal_ratio(x):
n_1 = sum(x['Attrition'].values == 'Yes')
n_0 = sum(x['Attrition'].values == 'No')
return n_1/n_0
或者我可以很容易地编写一个 spark.sql 查询来完成它,然后为我想要比较的每个分类变量重新编写它。像 Pandas 这样的功能会让我的生活更轻松,但我找不到任何关于如何创建这种 UDF 或如何注册它的真正指导。
编辑:如果我还问这在 pyspark 中如何使用 UDF 可能会有所帮助?
b = data.groupby('BusinessTravel').apply(cal_ratio)
【问题讨论】:
【参考方案1】:不确定这是最好的解决方案,但你可以试试这个:
# My sample dataframe
df.show()
+---------+
|Attrition|
+---------+
| Yes|
| Yes|
| Yes|
| Yes|
| Yes|
| No|
| No|
+---------+
from pyspark.sql import functions as F
result = (
df.agg(
F.sum(F.when(F.col("Attrition") == "Yes", 1)).alias("Yes"),
F.sum(F.when(F.col("Attrition") == "No", 1)).alias("No"),
)
.select((F.col("Yes") / F.col("No")).alias("ratio"))
.first()
)
print(result.ratio)
> 2.5
当然,您可以通过将硬编码值替换为变量来将 result
事物转换为函数。
def cal_ratio(df):
result = (
df.agg(
F.sum(F.when(F.col("Attrition") == "Yes", 1)).alias("Yes"),
F.sum(F.when(F.col("Attrition") == "No", 1)).alias("No"),
)
.select((F.col("Yes") / F.col("No")).alias("ratio"))
.first()
)
return result.ratio
编辑:如果您需要按列分组,则需要将first
替换为collect
:
def cal_ratio(df):
result = (
df.groupBy("BusinessTravel")
.agg(
F.sum(F.when(F.col("Attrition") == "Yes", 1)).alias("Yes"),
F.sum(F.when(F.col("Attrition") == "No", 1)).alias("No"),
)
.select((F.col("Yes") / F.col("No")).alias("ratio"))
.collect()
)
return result
【讨论】:
以上是关于我需要创建一个 pyspark UDF,它通过比较从查询中输出一个表的主要内容,如果未能解决你的问题,请参考以下文章
pyspark 的用户定义函数 (UDF) 是不是需要单元测试?