如何验证 Spark Dataframe 的内容

Posted

技术标签:

【中文标题】如何验证 Spark Dataframe 的内容【英文标题】:How to Validate contents of Spark Dataframe 【发布时间】:2015-10-21 23:22:26 【问题描述】:

我有以下 Scala Spark 代码库,效果很好,但不应该。

第 2 列有混合类型的数据,而在 Schema 中我将其定义为 IntegerType。我的实际程序有超过 100 列,并且在转换后继续派生多个子 DataFrames

如何验证 RDDDataFrame 字段的内容是否具有正确的数据类型值,从而忽略无效行或将列的内容更改为某个默认值。感谢您使用DataFrameRDD 进行数据质量检查的更多指针。

var theSeq = Seq(("X01", "41"),
    ("X01", 41),
    ("X01", 41),
    ("X02", "ab"),
    ("X02", "%%"))

val newRdd = sc.parallelize(theSeq)
val rowRdd = newRdd.map(r => Row(r._1, r._2))

val theSchema = StructType(Seq(StructField("ID", StringType, true),
    StructField("Age", IntegerType, true)))
val theNewDF = sqc.createDataFrame(rowRdd, theSchema)
theNewDF.show()  

【问题讨论】:

【参考方案1】:

首先传递schema 只是一种避免类型推断的方法。在 DataFrame 创建期间不会验证或强制执行它。在旁注中,我不会将ClassCastException 描述为运行良好。有一瞬间我以为你真的发现了一个错误。

我认为重要的问题是你首先如何获得theSeq / newRdd 这样的数据。它是您自己解析的东西,是从外部组件接收的吗?只需查看类型(分别为Seq[(String, Any)] / RDD[(String, Any)]),您就已经知道它不是DataFrame 的有效输入。可能在这个级别处理事情的方法是采用静态类型。 Scala 提供了很多巧妙的方法来处理意外情况(TryEitherOption),其中最后一种是最简单的一种,并且可以很好地与 Spark SQL 配合使用。处理事情的相当简单的方式可能看起来像这样

def validateInt(x: Any) = x match 
  case x: Int => Some(x)
  case _ => None


def validateString(x: Any) = x match  
  case x: String => Some(x)
  case _ => None


val newRddOption: RDD[(Option[String], Option[Int])] = newRdd.map
  case (id, age) => (validateString(id), validateInt(age))

由于Options 可以轻松组合,您可以添加额外的检查,如下所示:

def validateAge(age: Int) = 
  if(age >= 0 && age < 150) Some(age)
  else None


val newRddValidated: RDD[(Option[String], Option[Int])] = newRddOption.map
  case (id, age) => (id, age.flatMap(validateAge))

接下来,我将使用案例类,而不是 Row,这是一个非常粗糙的容器:

case class Record(id: Option[String], age: Option[Int])

val records: RDD[Record] = newRddValidated.mapcase (id, age) => Record(id, age)

此时您只需拨打toDF

import org.apache.spark.sql.DataFrame

val df: DataFrame = records.toDF
df.printSchema

// root
//  |-- id: string (nullable = true)
//  |-- age: integer (nullable = true)

这是一种很难但可以说是一种更优雅的方式。一个更快的方法是让 SQL 强制转换系统为您完成工作。首先让我们将所有内容都转换为Strings

val stringRdd: RDD[(String, String)] = sc.parallelize(theSeq).map(
  p => (p._1.toString, p._2.toString))

接下来创建一个DataFrame:

import org.apache.spark.sql.types._
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions.col

val df: DataFrame = stringRdd.toDF("id", "age")

val expectedTypes = Seq(StringType, IntegerType)
val exprs: Seq[Column] = df.columns.zip(expectedTypes).map
  case (c, t) => col(c).cast(t).alias(c)

val dfProcessed: DataFrame = df.select(exprs: _*)

结果:

dfProcessed.printSchema

// root
//  |-- id: string (nullable = true)
//  |-- age: integer (nullable = true)


dfProcessed.show

// +---+----+
// | id| age|
// +---+----+
// |X01|  41|
// |X01|  41|
// |X01|  41|
// |X02|null|
// |X02|null|
// +---+----+

【讨论】:

我发布的代码只是为了证明我遇到的问题....非常感谢您的回复....虽然第一个解决方案很优雅,但是当它的 125 列在文件中时几乎不起作用...并且文件本质上是动态的...即使案例类也不起作用....因此 Row 是唯一的解决方案....铸造解决方案在我的情况下是合适的..非常感谢。 SS 这就是为什么要有元编程。可以说,有更好的语言比 Scala 更自然,但这是完全可能的。【参考方案2】:

在 1.4 或更早的版本中

import org.apache.spark.sql.execution.debug._
theNewDF.typeCheck

不过,它已通过 SPARK-9754 删除。我还没有检查,但我认为typeCheck 事先变成了sqlContext.debug

【讨论】:

以上是关于如何验证 Spark Dataframe 的内容的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Spark 中对嵌套的 Dataframe 进行平面映射

如何在 Scala(Spark 2.0)中将带有字符串的 DataFrame 转换为带有 Vectors 的 DataFrame

如何优化 spark 函数以用零替换空值?

查看 Spark Dataframe 列的内容

Note_Spark_Day07:Spark SQL(DataFrame是什么和数据分析(案例讲解))

Spark dataframe 列内容修改