将无效数据设置为 Spark DataFrames 中的缺失数据

Posted

技术标签:

【中文标题】将无效数据设置为 Spark DataFrames 中的缺失数据【英文标题】:Setting invalid data to missing data in Spark DataFrames 【发布时间】:2015-11-18 14:30:25 【问题描述】:

令 x 为定义为(在 Scala 中)的两列字符串的数据框

case class Pair(X: String, Y: String)

val x = sqlContext.createDataFrame(Seq(
   Pair("u1", "1"), 
   Pair("u2", "wrong value"), 
   Pair("u3", "5"), 
   Pair("u4", "2")
))

我想清理这个数据框,使第二列的每个值都是

    如果可能,转换为 Int 替换为 null、Na 或任何表示“缺失值”的符号(不是 NaN,这是不同的)

我正在考虑使用 udf 函数

val stringToInt = udf[Int, String](x => try 
     x.toInt
    catch 
     case e: Exception => null
   )

x.withColumn("Y", stringToInt(x("Y")))

... 但是 null 不是字符串,编译器会拒绝它。请问,有什么办法解决吗?只要我可以清理我的数据框,完全不同的方法也可以

【问题讨论】:

注意 val x = sqlContext.createDataFrame(Seq(Pair("u1", "a1"), Pair("u1", "a2"), Pair("u2", "a1" ), Pair("u2", null))) 返回一个由两列字符串组成的数据框,其中包含一个空值,我们可以使用 x.na.drop 等标准工具 【参考方案1】:

实际上,在这种特殊情况下,不需要 UDF。相反,您可以安全地使用Column.cast 方法:

import org.apache.spark.sql.types.IntegerType
val clean = x.withColumn("Y", $"Y".cast(IntegerType)) // or .cast("integer")

clean.where($"Y".isNotNull).show
// +---+---+
// |  X|  Y|
// +---+---+
// | u1|  1|
// | u3|  5|
// | u4|  2|
// +---+---+

clean.where($"Y".isNull).show
// +---+----+
// |  X|   Y|
// +---+----+
// | u2|null|
// +---+----+

【讨论】:

【参考方案2】:

不要使用null,而是使用Option[Int]

val pairs = Seq(
   Pair("u1", "1"), 
   Pair("u2", "wrong value"), 
   Pair("u3", "5"), 
   Pair("u4", "2")
)

def toInt(s: String): Option[Int] = try  Some(s.toInt)  catch  case NumberFormatException => None 

val stringToInt = udf[Int, Option[Int]](toInt _)

那你就可以了

val x = sqlContext.createDataFrame(pairs)
x.withColumn("Y", stringToInt(x("Y")))

【讨论】:

请检查我的修改 为什么不简单地Try(s.toInt).toOption @zero323 没有真正的理由,这也是合适的。它可能会生成不同的字节码 会的,但我相信这里会更惯用,更不用说更短了:) 无论如何+1。 @zero323 它在语义上与我列出的不同,因为它捕获任何异常(可能更好或更糟),并且几乎可以肯定地将值比原始 try/catch 更多地装箱(可能重要也可能不重要)

以上是关于将无效数据设置为 Spark DataFrames 中的缺失数据的主要内容,如果未能解决你的问题,请参考以下文章

Spark - 如何将 JSON 转义的字符串字段解析为 DataFrames 中的 JSON 对象?

将 Spark Dataframes 的每一行转换为一个字符串,并在 scala 中的每列值之间使用分隔符

是否可以将 Spark 中的 data.table 与 Spark Dataframes 一起使用?

使用 Dataframes 的 Spark Overlap 算法

使用 pyspark 在循环中附加 Spark DataFrames 的有效方法

使用 Dataframes 从 Informix 到 Spark 的 JDBC