PySpark UDF 无法识别参数数量
Posted
技术标签:
【中文标题】PySpark UDF 无法识别参数数量【英文标题】:PySpark UDF not recognizing number of arguments 【发布时间】:2019-10-16 09:43:21 【问题描述】:我定义了一个 Python 函数“DateTimeFormat”,它接受三个参数
具有日期格式(字符串)的 Spark Dataframe 列 yyyy-mm-dd等列值的输入格式(字符串) 输出格式,即输入必须返回的格式,如 yyyymmdd (String)
我现在已经在 Pyspark 中将此函数注册为 UDF。
udf_date_time = udf(DateTimeFormat,StringType())
我正在尝试在数据框选择中调用此 UDF,只要输入格式和输出不同,如下所示,它似乎工作正常
df.select(udf_date_time('entry_date',lit('mmddyyyy'),lit('yyyy-mm-dd')))
但是当输入格式和输出格式相同时失败,出现如下错误
df.select('exit_date',udf_date_time('exit_date',lit('yyyy-mm-dd'),lit('yyyy-mm-dd')))
"DateTimeFormat" 正好采用 3 个参数。给定2个
但我显然向 UDF 发送了三个参数
我已经在 Python 2.7 和 Spark 2.1 上尝试过上述示例
当输入和输出格式相同时,该函数在普通 Python 中似乎可以正常工作
>>>DateTimeFormat('10152019','mmddyyyy','mmddyyyy')
'10152019'
>>>
但下面的代码在 SPARK 中运行时会出错
import datetime
# Standard date,timestamp formatter
# Takes string date, its format and output format as arguments
# Returns string formatted date
def DateTimeFormat(col,in_frmt,out_frmt):
date_formatter ='yyyy':'%Y','mm':'%m','dd':'%d','HH':'%H','MM':'%M','SS':'%S'
for key,value in date_formatter.items():
in_frmt = in_frmt.replace(key,value)
out_frmt = out_frmt.replace(key,value)
return datetime.datetime.strptime(col,in_frmt).strftime(out_frmt)
使用以下代码调用 UDF
from pyspark.sql.functions import udf,lit
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
# Create SPARK session
spark = SparkSession.builder.appName("DateChanger").enableHiveSupport().getOrCreate()
df = spark.read.format("csv").option("header", "true").load(file_path)
# Registering UDF
udf_date_time = udf(DateTimeFormat,StringType())
df.select('exit_date',udf_date_time('exit_date',lit('yyyy-mm-dd'),lit('yyyy-mm-dd'))).show()
CSV文件输入Input file
预期结果是命令
df.select('exit_date',udf_date_time('exit_date',lit('yyyy-mm-dd'),lit('yyyy-mm-dd'))).show()
不应抛出任何错误,例如 DateTimeFormat 只接受 3 个参数,但给定了 2 个
【问题讨论】:
【参考方案1】:我不确定是否有更好的方法来做到这一点,但您可以尝试以下方法。
在这里,我假设您希望您的日期为特定格式,并在您的 DateTimeFormat
函数中设置了默认的输出格式 (out_frmt='yyyy-mm-dd'
)
我添加了一个名为udf_score
的新函数来帮助进行转换。你可能会感兴趣
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf, lit
df = spark.createDataFrame([
["10-15-2019"],
["10-16-2019"],
["10-17-2019"],
], ['exit_date'])
import datetime
def DateTimeFormat(col,in_frmt,out_frmt='yyyy-mm-dd'):
date_formatter ='yyyy':'%Y','mm':'%m','dd':'%d','HH':'%H','MM':'%M','SS':'%S'
for key,value in date_formatter.items():
in_frmt = in_frmt.replace(key,value)
out_frmt = out_frmt.replace(key,value)
return datetime.datetime.strptime(col,in_frmt).strftime(out_frmt)
def udf_score(in_frmt):
return udf(lambda l: DateTimeFormat(l, in_frmt))
in_frmt = 'mm-dd-yyyy'
df.select('exit_date',udf_score(in_frmt)('exit_date').alias('new_dates')).show()
+----------+----------+
| exit_date| new_dates|
+----------+----------+
|10-15-2019|2019-10-15|
|10-16-2019|2019-10-16|
|10-17-2019|2019-10-17|
+----------+----------+
【讨论】:
以上是关于PySpark UDF 无法识别参数数量的主要内容,如果未能解决你的问题,请参考以下文章