PySpark 用户定义函数 (UDF) 创建新列

Posted

技术标签:

【中文标题】PySpark 用户定义函数 (UDF) 创建新列【英文标题】:PySpark Uuser defined function (UDF) to create new column 【发布时间】:2021-10-21 11:36:20 【问题描述】:

我有下表:

name  time  
 a     5.2
 b    10.4
 c     7.8
 d    11.2
 e     3.5
 f    6.27
 g    2.43

我想创建额外的列(col1、col2、col2),其中 col1 > time 10,col2

我已尝试使用以下 UDF;但无法从那里去。

def UserDefinedfunction(value): 
    if value > 10: 
  return 'True' 
    else: 
  return 'False'

udf_func = udf(UserDefinedfunction, StringType())

任何帮助都会得到高度认可。

【问题讨论】:

【参考方案1】:

您可以使用纯 pyspark 代替 UDF:

from pyspark.sql import functions as F
from pyspark.sql import types

d = [
    ("a", 5.2),
    ("b", 10.4),
    ("c", 7.8),
    ("d", 11.2),
    ("e", 3.5),
    ("f", 6.27),
    ("g", 2.43)
    
]
df = spark.createDataFrame(d, ['name','time'])
df.show()

# output
+----+----+
|name|time|
+----+----+
|   a| 5.2|
|   b|10.4|
|   c| 7.8|
|   d|11.2|
|   e| 3.5|
|   f|6.27|
|   g|2.43|
+----+----+

(
    df
    .withColumn("col1", F.when(F.col("time") > 10, True).otherwise(False))
    .withColumn("col2", F.when(F.col("time") < 0, True).otherwise(False))
    .withColumn("col3", F.when(
        (
            (F.col("time") <= 12) & 
            (F.col("Time") >= 0)
            ), True).otherwise(False))
    .show()
)

# output
+----+----+-----+-----+----+
|name|time| col1| col2|col3|
+----+----+-----+-----+----+
|   a| 5.2|false|false|true|
|   b|10.4| true|false|true|
|   c| 7.8|false|false|true|
|   d|11.2| true|false|true|
|   e| 3.5|false|false|true|
|   f|6.27|false|false|true|
|   g|2.43|false|false|true|
+----+----+-----+-----+----+

@编辑

如果出于某种原因需要 UDF,也许你可以这样做:

@F.pandas_udf(types.BooleanType())
def build_col1(s: pd.Series) -> pd.Series:
    return s.apply(lambda x: x > 10)

@F.pandas_udf(types.BooleanType())
def build_col2(s: pd.Series) -> pd.Series:
    return s.apply(lambda x: x < 0)

@F.pandas_udf(types.BooleanType())
def build_col3(s: pd.Series) -> pd.Series:
    return s.apply(lambda x: x >= 0 and x <= 12)

(
    df
    .withColumn("col1", build_col1("time"))
    .withColumn("col2", build_col2("time"))
    .withColumn("col3", build_col3("time"))
    .show()

)

# output
+----+----+-----+-----+----+
|name|time| col1| col2|col3|
+----+----+-----+-----+----+
|   a| 5.2|false|false|true|
|   b|10.4| true|false|true|
|   c| 7.8|false|false|true|
|   d|11.2| true|false|true|
|   e| 3.5|false|false|true|
|   f|6.27|false|false|true|
|   g|2.43|false|false|true|
+----+----+-----+-----+----+

【讨论】:

谢谢你;好吧,不幸的是,我的用例需要 UDF。 嗯,但你接受了答案? 因为您的努力和代码可能对某人有用。 @sam 看到我的编辑,这有帮助吗? 是的,这行得通;需要删除类型(types.BooleanType())。非常感谢。

以上是关于PySpark 用户定义函数 (UDF) 创建新列的主要内容,如果未能解决你的问题,请参考以下文章

如何创建 Pyspark UDF 以向数据框添加新列

PySpark / 计算出现次数并使用 UDF 创建新列

使用 PySpark 中的列表中的 withColumn 函数动态创建新列

在 PySpark Pandas UDF 中指定用户定义函数的正确方法

udf(用户定义函数)如何在 pyspark 中工作?

pyspark 的用户定义函数 (UDF) 是不是需要单元测试?