Pyspark:如何处理 python 用户定义函数中的空值
Posted
技术标签:
【中文标题】Pyspark:如何处理 python 用户定义函数中的空值【英文标题】:Pyspark: How to deal with null values in python user defined functions 【发布时间】:2019-05-07 15:16:21 【问题描述】:我想使用一些不是 pyspark 原生的字符串相似性函数,例如数据帧上的 jaro 和 jaro-winkler 度量。这些在 jellyfish
等 python 模块中很容易获得。对于没有 null
值的情况,我可以写 pyspark udf 的罚款,即比较猫和狗。当我将这些 udf 应用于存在 null
值的数据时,它不起作用。在诸如我正在解决的问题中,其中一个字符串为null
我需要帮助让我的字符串相似性 udf 正常工作,更具体地说,在其中一个值为 null
的情况下工作
我写了一个udf,当输入数据中没有空值时可以工作:
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
import pyspark.sql.functions as F
import jellyfish.cjellyfish
def jaro_winkler_func(df, column_left, column_right):
jaro_winkler_udf = udf(f=lambda s1, s2: jellyfish.jaro_winkler(s1, s2), returnType=DoubleType())
df = (df
.withColumn('test',
jaro_winkler_udf(df[column_left], df[column_right])))
return df
输入输出示例:
+-----------+------------+
|string_left|string_right|
+-----------+------------+
| dude| dud|
| spud| dud|
+-----------+------------+
+-----------+------------+------------------+
|string_left|string_right| test|
+-----------+------------+------------------+
| dude| dud|0.9166666666666666|
| spud| dud|0.7222222222222222|
+-----------+------------+------------------+
当我在具有空值的数据上运行它时,我会得到通常的大量火花错误,最适用的似乎是TypeError: str argument expected
。我认为这是由于数据中的null
值,因为它在没有值的情况下起作用。
我修改了上面的函数来检查两个值是否不为空,只有在这种情况下才运行该函数,否则返回 0。
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
import pyspark.sql.functions as F
import jellyfish.cjellyfish
def jaro_winkler_func(df, column_left, column_right):
jaro_winkler_udf = udf(f=lambda s1, s2: jellyfish.jaro_winkler(s1, s2), returnType=DoubleType())
df = (df
.withColumn('test',
F.when(df[column_left].isNotNull() & df[column_right].isNotNull(),
jaro_winkler_udf(df[column_left], df[column_right]))
.otherwise(0.0)))
return df
但是,我仍然遇到与以前相同的错误。
示例输入和我想要的输出:
+-----------+------------+
|string_left|string_right|
+-----------+------------+
| dude| dud|
| spud| dud|
| spud| null|
| null| null|
+-----------+------------+
+-----------+------------+------------------+
|string_left|string_right| test|
+-----------+------------+------------------+
| dude| dud|0.9166666666666666|
| spud| dud|0.7222222222222222|
| spud| null|0.0 |
| null| null|0.0 |
+-----------+------------+------------------+
【问题讨论】:
你试过用空字符串替换空值吗? @Chris 我想过但没有尝试过。即使它确实有效,我也不认为这是一个很好的解决方案,因为如果我必须将 null 转换为空字符串,运行所有比较然后将它们转回 null。此外,当它扩展到数百或数千亿条记录时,我宁愿不让它相互比较大量的空字符串,只是为了得到一个已知的零值,因为其中一个值为空。 您需要使您的udf
对不良输入具有鲁棒性。请参阅this answer 中的解释,但本质上您需要类似:udf(f=lambda s1, s2: jellyfish.jaro_winkler(s1, s2) if s1 is not None and s2 is not None else None, returnType=DoubleType())
Using UDF ignores condition in when的可能重复
@pault 这可以解决问题。不知道它会如何执行。我可能需要编写一个 scala 函数,然后以某种方式让它与 pyspark 搭配得很好。
【参考方案1】:
我们将对您的代码稍作修改,它应该可以正常工作:
@udf(DoubleType())
def jaro_winkler(s1, s2):
if not all((s1, s2)): # or, if None in (s1, s2):
out = 0
else:
out = jellyfish.jaro_winkler(s1, s2)
return out
def jaro_winkler_func(df, column_left, column_right):
df = df.withColumn("test", jaro_winkler(df[column_left], df[column_right]))
return df
根据预期的行为,您需要更改测试:
if not all((s1, s2)):
将为 null
和空返回 0
字符串''
。
if None in (s1, s2):
将只为 null
返回 0
【讨论】:
如果s1
或s2
是空字符串怎么办?
好吧,这就是python的魅力,它当然会返回0 :)
但这不一定是正确答案(取决于jellyfish.jaro_winkler
所做的),因为None
与空字符串不同。
@pault 我同意,但实际上,这只是第一次更改为s1 is None or s2 is None
的测试
@pault 我做了一个包含两种情况的编辑。以上是关于Pyspark:如何处理 python 用户定义函数中的空值的主要内容,如果未能解决你的问题,请参考以下文章
如何处理我在尝试通过 Pyspark 从 SQL 写入 Kudu 时遇到的这个错误