Pyspark:使用带有参数的UDF创建一个新列[重复]
Posted
技术标签:
【中文标题】Pyspark:使用带有参数的UDF创建一个新列[重复]【英文标题】:Pyspark: Using UDF with argument(s) to create a new column [duplicate] 【发布时间】:2018-01-25 13:22:32 【问题描述】:我有一个用户定义的函数,如下所示,我想用它来派生数据框中的新列:
def to_date_formatted(date_str, format):
if date_str == '' or date_str is None:
return None
try:
dt = datetime.datetime.strptime(date_str, format)
except:
return None
return dt.date()
spark.udf.register("to_date_udf", to_date_formatted, DateType())
我可以通过运行像select to_date_udf(my_date, '%d-%b-%y') as date
这样的sql 来使用它。注意将自定义格式作为参数传递给函数的能力
但是,我正在努力使用 pyspark 列表达式语法而不是 sql 来使用它
我想写这样的东西:
df.with_column("date", to_date_udf('my_date', %d-%b-%y')
但这会导致错误。我该怎么做?
[编辑:在此特定示例中,在 Spark 2.2+ 中,您可以使用内置的 to_date
函数提供可选格式参数。我目前在 Spark 2.0 上,所以这对我来说是不可能的。另外值得注意的是,我提供了这个作为示例,但我感兴趣的是为 UDF 提供参数的一般语法,而不是日期转换的细节]
【问题讨论】:
谢谢您-该问题的答案很有用并证实了我的发现。我花了一段时间谷歌试图找到这个问题的答案,但我正在更广泛地搜索“pyspark udf arguments”之类的东西,而另一个问题的标题只是与此间接相关。 【参考方案1】:我找到了三个选项来实现这一点:
设置可重现示例
import pandas as pd
import datetime
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import DateType
from pyspark.sql.functions import expr, lit
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
def to_date_formatted(date_str, format):
if date_str == '' or date_str is None:
return None
try:
dt = datetime.datetime.strptime(date_str, format)
except:
return None
return dt.date()
data =
data["date_str_1"] = ["01-Dec-17", "05-Jan-12", "08-Mar-15"]
data["date_str_2"] = ["01/12/17", "05/01/12", "08/03/15"]
df = pd.DataFrame(data)
df = spark.createDataFrame(df)
df.registerTempTable('df')
选项 1
from pyspark.sql.functions import udf
to_date_udf = udf(to_date_formatted, DateType())
df = df.withColumn("parsed_date", to_date_udf('date_str_1', lit('%d-%b-%y')))
df.show()
选项 2
spark.udf.register("to_date_udf", to_date_formatted, DateType())
ex = "to_date_udf(date_str_1, '%d-%b-%y') as d"
df = df.withColumn("parsed_date", expr(ex))
df.show()
选项 3
选项 3 只是对to_date_formatted
函数进行柯里化:
from functools import partial
curried_to_date = partial(to_date_formatted, format="%d-%b-%y")
curried_to_date = udf(curried_to_date, DateType())
df.withColumn("parsed_date", curried_to_date('date_str_1'))
【讨论】:
【参考方案2】:只需使用to_date
:
from pyspark.sql.functions import to_date
df.withColumn("date_str_1_", to_date("date_str_1", "dd-MMM-yy")).show()
# +----------+----------+-----------+
# |date_str_1|date_str_2|date_str_1_|
# +----------+----------+-----------+
# | 01-Dec-17| 01/12/17| 2017-12-01|
# | 05-Jan-12| 05/01/12| 2012-01-05|
# | 08-Mar-15| 08/03/15| 2015-03-08|
# +----------+----------+-----------+
df.withColumn("date_str_2_", to_date("date_str_2", "dd/MM/yy")).show()
# +----------+----------+-----------+
# |date_str_1|date_str_2|date_str_2_|
# +----------+----------+-----------+
# | 01-Dec-17| 01/12/17| 2017-12-01|
# | 05-Jan-12| 05/01/12| 2012-01-05|
# | 08-Mar-15| 08/03/15| 2015-03-08|
# +----------+----------+-----------+
【讨论】:
谢谢。我在 Spark 2.0 上,不幸的是 to_date 直到 Spark 2.2 才使用日期格式化程序 - 在那之前,字符串必须是 iso 格式 与早期版本***.com/q/45492203/8371915以上是关于Pyspark:使用带有参数的UDF创建一个新列[重复]的主要内容,如果未能解决你的问题,请参考以下文章
在 for 循环中使用 udf 在 Pyspark 中创建多个列