将可为空的列作为参数传递给 Spark SQL UDF

Posted

技术标签:

【中文标题】将可为空的列作为参数传递给 Spark SQL UDF【英文标题】:Passing nullable columns as parameter to Spark SQL UDF 【发布时间】:2017-09-05 09:45:52 【问题描述】:

这是一个 Spark UDF,我用它来计算使用几列的值。

def spark_udf_func(s: String, i:Int): Boolean =  
    // I'm returning true regardless of the parameters passed to it.
    true


val spark_udf = org.apache.spark.sql.functions.udf(spark_udf_func _)

val df = sc.parallelize(Array[(Option[String], Option[Int])](
  (Some("Rafferty"), Some(31)), 
  (null, Some(33)), 
  (Some("Heisenberg"), Some(33)),  
  (Some("Williams"), null)
)).toDF("LastName", "DepartmentID")

df.withColumn("valid", spark_udf(df.col("LastName"), df.col("DepartmentID"))).show()
+----------+------------+-----+
|  LastName|DepartmentID|valid|
+----------+------------+-----+
|  Rafferty|          31| true|
|      null|          33| true|
|Heisenberg|          33| true|
|  Williams|        null| null|
+----------+------------+-----+

谁能解释为什么最后一行的有效列的值为空?

当我检查 spark 计划时,我发现该计划有一个 case 条件,即如果 column2 (DepartmentID) 为 null,则它必须返回 null。

== Physical Plan ==

*Project [_1#699 AS LastName#702, _2#700 AS DepartmentID#703, if (isnull(_2#700)) null else UDF(_1#699, _2#700) AS valid#717]
+- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, unwrapoption(ObjectType(class java.lang.String), assertnotnull(input[0, scala.Tuple2, true])._1), true) AS _1#699, unwrapoption(IntegerType, assertnotnull(input[0, scala.Tuple2, true])._2) AS _2#700]
   +- Scan ExternalRDDScan[obj#698]

为什么我们在 Spark 中有这样的行为? 为什么只有整数列? 我在这里做错了什么,当 UDF 参数为空时,在 UDF 中处理空值的正确方法是什么?

【问题讨论】:

另见***.com/questions/42791912/… 【参考方案1】:

问题是 null 不是 scala Int 的有效值(它是支持值),而它是 String 的有效值。 Int 等价于 java int 原语并且必须有一个值。这意味着当值为 null 时无法调用 udf,因此 null 仍然存在。

有两种方法可以解决这个问题:

    更改函数以接受 java.lang.Integer(它是一个对象,可以为 null) 如果你不能改变函数,你可以使用when/otherwise来做一些特殊的事情以防null。例如 when(col("int col").isNull, someValue).otherwise(原始调用)

可以在here找到一个很好的解释

【讨论】:

还有第三个选项允许您坚持使用 Scala Int:将参数打包到结构中(使用 df.withColumn("valid", spark_udf(struct(df.col("LastName"), df.col("DepartmentID")))))并使用 Row 作为 udf 的输入参数。在 udf 中,您可以使用 row.isNullAt(i: Int) 检查该行的空值【参考方案2】:

要接受 null,请使用 Integer(Java 数据类型而不是 Scala Int)

def spark_udf_func(s: String, i:Integer): Boolean =  
    // I'm returning true regardless of the parameters passed to it.
    if(i == null) false else true

【讨论】:

以上是关于将可为空的列作为参数传递给 Spark SQL UDF的主要内容,如果未能解决你的问题,请参考以下文章

在 PySpark 数据框中添加可为空的列

如何使用 typeorm 将可为空的数据库字段设置为 NULL?

将可为空的 DateTime 字段更新为 null 会导致默认 DateTime 值 (0001-01-01 00:00:00.0000000)

将表中的列值作为参数传递给存储过程 sql

代码首先为必填字段生成可为空的列

在 PSQL 中为可为空的列添加唯一约束