使用 Spark 验证 CSV 文件列
Posted
技术标签:
【中文标题】使用 Spark 验证 CSV 文件列【英文标题】:Validate CSV file columns with Spark 【发布时间】:2020-03-13 15:53:07 【问题描述】:我正在尝试在 Spark 中读取一个 CSV 文件(应该有一个标题)并将数据加载到现有表中(具有预定义的列和数据类型)。 csv 文件可能非常大,所以如果 csv 中的列标题不是“有效”,我可以避免这样做会很棒。
当我当前正在读取文件时,我将 StructType 指定为架构,但这并不能验证标题是否包含正确顺序的正确列。 这就是我目前所拥有的(我正在另一个地方构建“模式”StructType):
sqlContext
.read()
.format("csv")
.schema(schema)
.load("pathToFile");
如果我添加 .option("header", "true)"
行,它将覆盖 csv 文件的第一行,并使用我在 StructType 的 add
方法中传递的名称。 (例如,如果我使用“id”和“name”构建 StructType,并且 csv 中的第一行是“idzzz,name”,则生成的数据框将包含“id”和“name”列。我希望能够验证csv 标题与我计划加载 csv 的表具有相同的列名称。
我尝试使用.head()
读取文件,并对第一行进行一些检查,但会下载整个文件。
欢迎提出任何建议。
【问题讨论】:
所以你真正想要的是读取文件的第一行,检查它是否等于一个字符串,然后决定是否要进一步处理你的文件,对吧?如果是这种情况,您是否考虑过不使用 spark 执行此处理步骤?在启动 spark 作业之前,甚至在文件上运行 bash 脚本之前,在驱动程序中使用纯 java 执行此操作可能会更容易。 基本上,是的。仍在考虑在 spark 之外提取“标题”部分。 【参考方案1】:据我了解,您想验证您阅读的 CSV 的架构。 schema 选项的问题在于它的目标是告诉 spark 它是您的数据的架构,而不是检查它是否是。
但是,有一个选项可以在读取 CSV 时推断出上述架构,这在您的情况下可能非常有用 (inferSchema
)。然后,您可以将该架构与您期望的equals
进行比较,或者执行我将介绍的更宽松一点的小变通方法。
让我们看看下面的文件是如何工作的:
a,b
1,abcd
2,efgh
然后,让我们读取数据。我使用了 scala REPL,但您应该能够非常轻松地在 Java 中转换所有这些。
val df = spark.read
.option("header", true) // reading the header
.option("inferSchema", true) // infering the sschema
.csv(".../file.csv")
// then let's define the schema you would expect
val schema = StructType(Array(StructField("a", IntegerType),
StructField("b", StringType)))
// And we can check that the schema spark inferred is the same as the one
// we expect:
schema.equals(df.schema)
// res14: Boolean = true
走得更远
这是一个完美的世界。实际上,如果您的架构包含例如不可为空的列或其他小的差异,那么这种基于对象严格相等的解决方案将不起作用。
val schema2 = StructType(Array(StructField("a", IntegerType, false),
StructField("b", StringType, true)))
// the first column is non nullable, it does not work because all the columns
// are nullable when inferred by spark:
schema2.equals(df.schema)
// res15: Boolean = false
在这种情况下,您可能需要实现适合您的架构比较方法:
def equalSchemas(s1 : StructType, s2 : StructType) =
s1.indices
.map(i => s1(i).name.toUpperCase.equals(s2(i).name.toUpperCase) &&
s1(i).dataType.equals(s2(i).dataType))
.reduce(_ && _)
equalSchemas(schema2, df.schema)
// res23: Boolean = true
我正在检查列的名称和类型是否匹配并且顺序是否相同。您可能需要根据需要实现不同的逻辑。
【讨论】:
以上是关于使用 Spark 验证 CSV 文件列的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 spark 进行 map-reduce 流选择 N 列,文件夹下所有 csv 文件的前 M 行?
如果 csv 列标题包含空格,则在 spark 中将 csv 转换为 parquet 会出错
在 Spark 中使用 partitionBy 保存 CSV 文件 [重复]