使用 Spark 验证 CSV 文件列

Posted

技术标签:

【中文标题】使用 Spark 验证 CSV 文件列【英文标题】:Validate CSV file columns with Spark 【发布时间】:2020-03-13 15:53:07 【问题描述】:

我正在尝试在 Spark 中读取一个 CSV 文件(应该有一个标题)并将数据加载到现有表中(具有预定义的列和数据类型)。 csv 文件可能非常大,所以如果 csv 中的列标题不是“有效”,我可以避免这样做会很棒。

当我当前正在读取文件时,我将 StructType 指定为架构,但这并不能验证标题是否包含正确顺序的正确列。 这就是我目前所拥有的(我正在另一个地方构建“模式”StructType):

sqlContext
  .read()
  .format("csv")
  .schema(schema)
  .load("pathToFile");

如果我添加 .option("header", "true)" 行,它将覆盖 csv 文件的第一行,并使用我在 StructType 的 add 方法中传递的名称。 (例如,如果我使用“id”和“name”构建 StructType,并且 csv 中的第一行是“idzzz,name”,则生成的数据框将包含“id”和“name”列。我希望能够验证csv 标题与我计划加载 csv 的表具有相同的列名称。

我尝试使用.head() 读取文件,并对第一行进行一些检查,但会下载整个文件。

欢迎提出任何建议。

【问题讨论】:

所以你真正想要的是读取文件的第一行,检查它是否等于一个字符串,然后决定是否要进一步处理你的文件,对吧?如果是这种情况,您是否考虑过不使用 spark 执行此处理步骤?在启动 spark 作业之前,甚至在文件上运行 bash 脚本之前,在驱动程序中使用纯 java 执行此操作可能会更容易。 基本上,是的。仍在考虑在 spark 之外提取“标题”部分。 【参考方案1】:

据我了解,您想验证您阅读的 CSV 的架构。 schema 选项的问题在于它的目标是告诉 spark 它是您的数据的架构,而不是检查它是否是。

但是,有一个选项可以在读取 CSV 时推断出上述架构,这在您的情况下可能非常有用 (inferSchema)。然后,您可以将该架构与您期望的equals 进行比较,或者执行我将介绍的更宽松一点的小变通方法。

让我们看看下面的文件是如何工作的:

a,b
1,abcd
2,efgh

然后,让我们读取数据。我使用了 scala REPL,但您应该能够非常轻松地在 Java 中转换所有这些。

val df = spark.read
    .option("header", true) // reading the header
    .option("inferSchema", true) // infering the sschema
    .csv(".../file.csv")
// then let's define the schema you would expect
val schema = StructType(Array(StructField("a", IntegerType),
                              StructField("b", StringType)))

// And we can check that the schema spark inferred is the same as the one
// we expect:
schema.equals(df.schema)
// res14: Boolean = true

走得更远

这是一个完美的世界。实际上,如果您的架构包含例如不可为空的列或其他小的差异,那么这种基于对象严格相等的解决方案将不起作用。

val schema2 = StructType(Array(StructField("a", IntegerType, false),
                               StructField("b", StringType, true)))
// the first column is non nullable, it does not work because all the columns
// are  nullable when inferred by spark:
schema2.equals(df.schema)
// res15: Boolean = false

在这种情况下,您可能需要实现适合您的架构比较方法:

def equalSchemas(s1 : StructType, s2 : StructType) = 
  s1.indices
    .map(i => s1(i).name.toUpperCase.equals(s2(i).name.toUpperCase) &&
              s1(i).dataType.equals(s2(i).dataType))
    .reduce(_ && _)

equalSchemas(schema2, df.schema)
// res23: Boolean = true

我正在检查列的名称和类型是否匹配并且顺序是否相同。您可能需要根据需要实现不同的逻辑。

【讨论】:

以上是关于使用 Spark 验证 CSV 文件列的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 spark 进行 map-reduce 流选择 N 列,文件夹下所有 csv 文件的前 M 行?

如果 csv 列标题包含空格,则在 spark 中将 csv 转换为 parquet 会出错

在 Spark 中使用 partitionBy 保存 CSV 文件 [重复]

使用 Spark 将多个文件中的列合并到单个文件中

无法过滤存储在 spark 2.2.0 数据框中的 CSV 列

按列分组和排序csv文件spark [duplicate]