Spark:在 CASE WHEN 表达式中使用空检查来防止类型错误

Posted

技术标签:

【中文标题】Spark:在 CASE WHEN 表达式中使用空检查来防止类型错误【英文标题】:Spark: Using null checking in a CASE WHEN expression to protect against type errors 【发布时间】:2021-10-25 08:31:05 【问题描述】:

我有一个 UDF,它比较两个字符串 str_leftstr_right,但如果其中一个为空,则失败。

我认为应该可以使用 case 表达式来“保护”udf,如下所示:

select 
    case 
    when str_left is null or str_right is null then -1 
    else my_udf(str_left, str_right)
    end as my_col
from my_table

但这在实践中失败了。为什么这不起作用?

这是 pyspark 中的完整示例,它在 Spark 2.4.3 和 Spark 3.1.2 中产生错误 TypeError: object of type 'NoneType' has no len()

from pyspark.context import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType

sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
from pyspark.sql import Row

def rel_length(str1, str2):   
    return len(str1)/len(str2)

spark.udf.register("rel_length_py", rel_length, DoubleType())

rows = [
    "str_col_l": "a string", "str_col_r": "another string",
    "str_col_l": "a string", "str_col_r": None,

]
df = spark.createDataFrame(Row(**x) for x in rows)
df.createOrReplaceTempView("str_comp")

sql = """select 
    case
    when str_col_r is null or str_col_l is null  then -1
    else rel_length_py(str_col_l, str_col_r) 
    end
    as rel
from str_comp
"""
spark.sql(sql).show()

我已尝试将其简化为上述可重现的示例。我们遇到的“现实世界”问题是与this udf 类似的案例陈述。 Here's a gist 带有产生错误的代码。 Strangley,在这个更复杂的示例中,它在 spark 3.1.2 中失败,但在 2.4.3 中成功。

【问题讨论】:

这能回答你的问题吗? Applying UDF only on rows where value is not null or not an empty string not working as expected 谢谢 - 这非常有帮助。您对上面的评论可能是根本原因:“我认为优化器为了节省计算时间,计算真假输出,然后根据结果选择正确的输出”。是否有可能详细说明这一点,例如通过引用源代码或其他更详细的解释?即你有多确定这是真的? 我说I think 因为我不完全确定。许多系统使用相同的方法,但在遇到错误时不会崩溃。这里的问题是在代码中理解为什么错误没有被捕获和忽略。但是我没有花时间阅读代码并理解整个机制,对不起 没问题 - 这是一个非常有用的提示。我查看了代码here,但我有点超出我的深度。我会做一些测试,看看我是否可以验证这种行为并报告回来。 【参考方案1】:

在许多情况下,Spark 优化器将执行您的 case 表达式的所有部分,即使有些部分似乎无法访问。

在问题中给出的示例中,我们可以证明 Spark 执行 BOTH:

when str_col_r is null or str_col_l is null  then -1

else rel_length_py(str_col_l, str_col_r) 

即使在 str_col_r is nullstr_col_l is null 的情况下

这是一些示例代码。数据框如下,第二行重复100次。


| str_col_l   | str_col_r   |
|:------------|:------------|
| a           | b           |
| a string    | null        |
| a string    | null        |
| a string    | null        |
...96 repeats...
| a string    | null        |

我已经设置:

conf.set("spark.sql.shuffle.partitions", "1")
conf.set("spark.default.parallelism", "1")

我们运行一个在执行时休眠 1 秒的 UDF:

%%time

def rel_length(str1, str2):   
    time.sleep(1)
    if str1 is None or str2 is None:
        return -0.9

    return len(str1)/len(str2)

spark.udf.register("rel_length_py", rel_length, DoubleType())

rows = ["str_col_l": "a", "str_col_r": "b"] +  ["str_col_l": "a string", "str_col_r": None]*100

df = spark.createDataFrame(Row(**x) for x in rows)
df.createOrReplaceTempView("str_comp")

sql = """select 
    case
    when str_col_r is null or str_col_l is null  then -1
    else rel_length_py(str_col_l, str_col_r) 
    end
    as rel
from str_comp
"""
spark.sql(sql).toPandas().head(2)
CPU times: user 183 ms, sys: 61.8 ms, total: 245 ms
Wall time: 1min 46s

即大约 100 秒。

这里 sleep 语句在 UDF 中重新定位,因此它仅在第一行休眠 1 秒。

%%time

def rel_length(str1, str2):   
    if str1 is None or str2 is None:
        return -0.9
    time.sleep(1)
    return len(str1)/len(str2)

spark.udf.register("rel_length_py", rel_length, DoubleType())

rows = ["str_col_l": "a", "str_col_r": "b"] +  ["str_col_l": "a string", "str_col_r": None]*100

df = spark.createDataFrame(Row(**x) for x in rows)
df.createOrReplaceTempView("str_comp")

sql = """select 
    case
    when str_col_r is null or str_col_l is null  then -1
    else rel_length_py(str_col_l, str_col_r) 
    end
    as rel
from str_comp
"""
spark.sql(sql).toPandas().head(2)
CPU times: user 14.5 ms, sys: 6.42 ms, total: 20.9 ms
Wall time: 1.36 s

这证明至少在某些情况下 case 语句的所有部分都会执行。我不相信所有部分都保证可以执行,因为我已经看到如果所有部分都在执行时会出错的工作示例。

【讨论】:

【参考方案2】:

有趣的是,由于某种原因,即使 spark 不使用结果,它也会执行 udf(至少对于某些行)。有时催化剂会设法不这样做,例如当使用 lit(None) 生成列时,但大多数时候它会这样做。

解决此问题的最简单方法是修改您的 udf 以处理这种情况:

def rel_lenght(str1, str2):
    if str1 is None or str2 is None:
        return -1
    else:
        len(str1) / len(str2)

【讨论】:

以上是关于Spark:在 CASE WHEN 表达式中使用空检查来防止类型错误的主要内容,如果未能解决你的问题,请参考以下文章

mysql中case when的用法

SQL语句中case,when,then的用法

spark2.1:使用df.select(when(a===b,1).otherwise)替换(case when a===b then 1 else 0 end)

spark-sql case when 问题

oracle中case when使用

Oracle SQL 查询使用 case when,压缩空字段