在 Spark DataFrame 中将空值转换为空数组

Posted

技术标签:

【中文标题】在 Spark DataFrame 中将空值转换为空数组【英文标题】:Convert null values to empty array in Spark DataFrame 【发布时间】:2016-01-07 16:55:40 【问题描述】:

我有一个 Spark 数据框,其中一列是一个整数数组。该列可以为空,因为它来自左外连接。我想将所有空值转换为一个空数组,这样我以后就不必处理空值了。

我以为我可以这样做:

val myCol = df("myCol")
df.withColumn( "myCol", when(myCol.isNull, Array[Int]()).otherwise(myCol) )

但是,这会导致以下异常:

java.lang.RuntimeException: Unsupported literal type class [I [I@5ed25612
at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:49)
at org.apache.spark.sql.functions$.lit(functions.scala:89)
at org.apache.spark.sql.functions$.when(functions.scala:778)

when 函数显然不支持数组类型。还有其他简单的方法来转换空值吗?

如果相关,这里是此列的架构:

|-- myCol: array (nullable = true)
|    |-- element: integer (containsNull = false)

【问题讨论】:

看看coalesce sql函数docs.oracle.com/database/121/SQLRF/functions033.htm#SQLRF00617 【参考方案1】:

您可以使用 UDF:

import org.apache.spark.sql.functions.udf

val array_ = udf(() => Array.empty[Int])

结合WHENCOALESCE:

df.withColumn("myCol", when(myCol.isNull, array_()).otherwise(myCol))
df.withColumn("myCol", coalesce(myCol, array_())).show

最近的版本中你可以使用array函数:

import org.apache.spark.sql.functions.array, lit

df.withColumn("myCol", when(myCol.isNull, array().cast("array<integer>")).otherwise(myCol))
df.withColumn("myCol", coalesce(myCol, array().cast("array<integer>"))).show

请注意,只有在允许从 string 转换为所需类型的情况下,它才会起作用。

同样的事情当然也可以在 PySpark 中完成。对于遗留解决方案,您可以定义udf

from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, IntegerType

def empty_array(t):
    return udf(lambda: [], ArrayType(t()))()

coalesce(myCol, empty_array(IntegerType()))

在最近的版本中只需使用array:

from pyspark.sql.functions import array

coalesce(myCol, array().cast("array<integer>"))

【讨论】:

感谢您的帮助。我之前实际上尝试过 UDF,但没想到实际上调用 apply (即我正在做 array_ 而不是 array_())。 @zero323 你会如何在 pyspark 中做到这一点? @harppu 这为我回答了 pyspark:***.com/a/57198009/503826【参考方案2】:

对 zero323 的方法稍作修改后,我无需在 Spark 2.3.1 中使用 udf 即可做到这一点。

val df = Seq("a" -> Array(1,2,3), "b" -> null, "c" -> Array(7,8,9)).toDF("id","numbers")
df.show
+---+---------+
| id|  numbers|
+---+---------+
|  a|[1, 2, 3]|
|  b|     null|
|  c|[7, 8, 9]|
+---+---------+

val df2 = df.withColumn("numbers", coalesce($"numbers", array()))
df2.show
+---+---------+
| id|  numbers|
+---+---------+
|  a|[1, 2, 3]|
|  b|       []|
|  c|[7, 8, 9]|
+---+---------+

【讨论】:

在 PySpark 中,您可以使用第二种方法,只需使用 df2 = df.withColumn("numbers", coalesce(col("numbers"), array()))【参考方案3】:

当您希望数组元素的数据类型无法从 StringType 转换时,可以使用以下无 UDF 替代方案:

import pyspark.sql.types as T
import pyspark.sql.functions as F

df.withColumn(
    "myCol",
    F.coalesce(
        F.col("myCol"),
        F.from_json(F.lit("[]"), T.ArrayType(T.IntegerType()))
    )
)

您可以将IntegerType() 替换为任何数据类型,也可以是复杂数据类型。

【讨论】:

以上是关于在 Spark DataFrame 中将空值转换为空数组的主要内容,如果未能解决你的问题,请参考以下文章

在scala中将Spark Dataframe转换为RDD

如何在 pyspark 中将 DenseMatrix 转换为 spark DataFrame?

如何在 Spark 中将 JavaPairInputDStream 转换为 DataSet/DataFrame

如何在 Scala(Spark 2.0)中将带有字符串的 DataFrame 转换为带有 Vectors 的 DataFrame

在 Spark Dataframe API 中将出生日期转换为年龄

Spark DataFrame 空值到数据集