Pyspark:如何处理 python 用户定义函数中的空值

Posted

技术标签:

【中文标题】Pyspark:如何处理 python 用户定义函数中的空值【英文标题】:Pyspark: How to deal with null values in python user defined functions 【发布时间】:2019-05-07 15:16:21 【问题描述】:

我想使用一些不是 pyspark 原生的字符串相似性函数,例如数据帧上的 jaro 和 jaro-winkler 度量。这些在 jellyfish 等 python 模块中很容易获得。对于没有 null 值的情况,我可以写 pyspark udf 的罚款,即比较猫和狗。当我将这些 udf 应用于存在 null 值的数据时,它不起作用。在诸如我正在解决的问题中,其中一个字符串为null

是很常见的

我需要帮助让我的字符串相似性 udf 正常工作,更具体地说,在其中一个值为 null 的情况下工作

我写了一个udf,当输入数据中没有空值时可以工作:

from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
import pyspark.sql.functions as F
import jellyfish.cjellyfish

def jaro_winkler_func(df, column_left, column_right):

    jaro_winkler_udf = udf(f=lambda s1, s2: jellyfish.jaro_winkler(s1, s2), returnType=DoubleType())

    df = (df
          .withColumn('test',
                      jaro_winkler_udf(df[column_left], df[column_right])))

    return df

输入输出示例:

+-----------+------------+
|string_left|string_right|
+-----------+------------+
|       dude|         dud|
|       spud|         dud|
+-----------+------------+
+-----------+------------+------------------+
|string_left|string_right|              test|
+-----------+------------+------------------+
|       dude|         dud|0.9166666666666666|
|       spud|         dud|0.7222222222222222|
+-----------+------------+------------------+

当我在具有空值的数据上运行它时,我会得到通常的大量火花错误,最适用的似乎是TypeError: str argument expected。我认为这是由于数据中的null 值,因为它在没有值的情况下起作用。

我修改了上面的函数来检查两个值是否不为空,只有在这种情况下才运行该函数,否则返回 0。

from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
import pyspark.sql.functions as F
import jellyfish.cjellyfish

def jaro_winkler_func(df, column_left, column_right):

    jaro_winkler_udf = udf(f=lambda s1, s2: jellyfish.jaro_winkler(s1, s2), returnType=DoubleType())

    df = (df
       .withColumn('test',
                   F.when(df[column_left].isNotNull() & df[column_right].isNotNull(),
                          jaro_winkler_udf(df[column_left], df[column_right]))
                   .otherwise(0.0)))

    return df

但是,我仍然遇到与以前相同的错误。

示例输入和我想要的输出:

+-----------+------------+
|string_left|string_right|
+-----------+------------+
|       dude|         dud|
|       spud|         dud|
|       spud|        null|
|       null|        null|
+-----------+------------+
+-----------+------------+------------------+
|string_left|string_right|              test|
+-----------+------------+------------------+
|       dude|         dud|0.9166666666666666|
|       spud|         dud|0.7222222222222222|
|       spud|        null|0.0               |
|       null|        null|0.0               |
+-----------+------------+------------------+

【问题讨论】:

你试过用空字符串替换空值吗? @Chris 我想过但没有尝试过。即使它确实有效,我也不认为这是一个很好的解决方案,因为如果我必须将 null 转换为空字符串,运行所有比较然后将它们转回 null。此外,当它扩展到数百或数千亿条记录时,我宁愿不让它相互比较大量的空字符串,只是为了得到一个已知的零值,因为其中一个值为空。 您需要使您的udf 对不良输入具有鲁棒性。请参阅this answer 中的解释,但本质上您需要类似:udf(f=lambda s1, s2: jellyfish.jaro_winkler(s1, s2) if s1 is not None and s2 is not None else None, returnType=DoubleType()) Using UDF ignores condition in when的可能重复 @pault 这可以解决问题。不知道它会如何执行。我可能需要编写一个 scala 函数,然后以某种方式让它与 pyspark 搭配得很好。 【参考方案1】:

我们将对您的代码稍作修改,它应该可以正常工作:

@udf(DoubleType())
def jaro_winkler(s1, s2):
    if not all((s1, s2)):  # or, if None in (s1, s2):
        out = 0
    else:
        out = jellyfish.jaro_winkler(s1, s2)
    return out


def jaro_winkler_func(df, column_left, column_right):
    df = df.withColumn("test", jaro_winkler(df[column_left], df[column_right]))
    return df

根据预期的行为,您需要更改测试:

if not all((s1, s2)): 将为 null 和空返回 0 字符串''if None in (s1, s2): 将只为 null 返回 0

【讨论】:

如果s1s2 是空字符串怎么办? 好吧,这就是python的魅力,它当然会返回0 :) 但这不一定是正确答案(取决于jellyfish.jaro_winkler 所做的),因为None 与空字符串不同。 @pault 我同意,但实际上,这只是第一次更改为s1 is None or s2 is None的测试 @pault 我做了一个包含两种情况的编辑。

以上是关于Pyspark:如何处理 python 用户定义函数中的空值的主要内容,如果未能解决你的问题,请参考以下文章

如何处理我在尝试通过 Pyspark 从 SQL 写入 Kudu 时遇到的这个错误

如何处理自定义技能中用户的“是”/“否”响应?

如何处理 UiTableviewCell 文本字段?

如何在 tkinter GUI 位于同一文件中的情况下运行 python 脚本?您如何处理用户条目?

Tornado Python 如何处理客户端连接丢失

Python 的“open()”为“找不到文件”抛出不同的错误——如何处理这两个异常?