Pyspark:使用带有参数的UDF创建一个新列[重复]

Posted

技术标签:

【中文标题】Pyspark:使用带有参数的UDF创建一个新列[重复]【英文标题】:Pyspark: Using UDF with argument(s) to create a new column [duplicate] 【发布时间】:2018-01-25 13:22:32 【问题描述】:

我有一个用户定义的函数,如下所示,我想用它来派生数据框中的新列:

def to_date_formatted(date_str, format):
    if date_str == '' or date_str is None:
        return None
    try:
        dt = datetime.datetime.strptime(date_str, format)
    except:
        return None
    return dt.date()

spark.udf.register("to_date_udf", to_date_formatted, DateType())

我可以通过运行像select to_date_udf(my_date, '%d-%b-%y') as date 这样的sql 来使用它。注意将自定义格式作为参数传递给函数的能力

但是,我正在努力使用 pyspark 列表达式语法而不是 sql 来使用它

我想写这样的东西:

df.with_column("date", to_date_udf('my_date', %d-%b-%y')

但这会导致错误。我该怎么做?

[编辑:在此特定示例中,在 Spark 2.2+ 中,您可以使用内置的 to_date 函数提供可选格式参数。我目前在 Spark 2.0 上,所以这对我来说是不可能的。另外值得注意的是,我提供了这个作为示例,但我感兴趣的是为 UDF 提供参数的一般语法,而不是日期转换的细节]

【问题讨论】:

谢谢您-该问题的答案很有用并证实了我的发现。我花了一段时间谷歌试图找到这个问题的答案,但我正在更广泛地搜索“pyspark udf arguments”之类的东西,而另一个问题的标题只是与此间接相关。 【参考方案1】:

我找到了三个选项来实现这一点:

设置可重现示例

import pandas as pd 
import datetime 

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

from pyspark.sql.types import DateType
from pyspark.sql.functions import expr, lit

sc = SparkContext.getOrCreate()
spark = SparkSession(sc) 

def to_date_formatted(date_str, format):
    if date_str == '' or date_str is None:
        return None
    try:
        dt = datetime.datetime.strptime(date_str, format)
    except:
        return None
    return dt.date()

data = 
data["date_str_1"] = ["01-Dec-17", "05-Jan-12", "08-Mar-15"]
data["date_str_2"] = ["01/12/17", "05/01/12", "08/03/15"]

df = pd.DataFrame(data)
df = spark.createDataFrame(df)
df.registerTempTable('df')

选项 1

from pyspark.sql.functions import udf
to_date_udf = udf(to_date_formatted, DateType())
df = df.withColumn("parsed_date", to_date_udf('date_str_1', lit('%d-%b-%y')))
df.show()

选项 2

spark.udf.register("to_date_udf", to_date_formatted, DateType())
ex = "to_date_udf(date_str_1, '%d-%b-%y') as d"
df = df.withColumn("parsed_date", expr(ex))

df.show()

选项 3

选项 3 只是对to_date_formatted 函数进行柯里化:

from functools import partial
curried_to_date = partial(to_date_formatted, format="%d-%b-%y")

curried_to_date = udf(curried_to_date, DateType())
df.withColumn("parsed_date", curried_to_date('date_str_1'))

【讨论】:

【参考方案2】:

只需使用to_date:

from pyspark.sql.functions import to_date

df.withColumn("date_str_1_", to_date("date_str_1", "dd-MMM-yy")).show()
# +----------+----------+-----------+
# |date_str_1|date_str_2|date_str_1_|
# +----------+----------+-----------+
# | 01-Dec-17|  01/12/17| 2017-12-01|
# | 05-Jan-12|  05/01/12| 2012-01-05|
# | 08-Mar-15|  08/03/15| 2015-03-08|
# +----------+----------+-----------+

df.withColumn("date_str_2_", to_date("date_str_2", "dd/MM/yy")).show()
# +----------+----------+-----------+
# |date_str_1|date_str_2|date_str_2_|
# +----------+----------+-----------+
# | 01-Dec-17|  01/12/17| 2017-12-01|
# | 05-Jan-12|  05/01/12| 2012-01-05|
# | 08-Mar-15|  08/03/15| 2015-03-08|
# +----------+----------+-----------+

【讨论】:

谢谢。我在 Spark 2.0 上,不幸的是 to_date 直到 Spark 2.2 才使用日期格式化程序 - 在那之前,字符串必须是 iso 格式 与早期版本***.com/q/45492203/8371915

以上是关于Pyspark:使用带有参数的UDF创建一个新列[重复]的主要内容,如果未能解决你的问题,请参考以下文章

在 for 循环中使用 udf 在 Pyspark 中创建多个列

PySpark 用户定义函数 (UDF) 创建新列

如何创建 Pyspark UDF 以向数据框添加新列

Pyspark:在UDF中传递多列以及参数

使用 udf 传递列作为参数将自定义列添加到 pyspark 数据帧

具有多个参数的 PySpark UDF 返回 null