在 Spark 中加载非标准格式的 CSV

Posted

技术标签:

【中文标题】在 Spark 中加载非标准格式的 CSV【英文标题】:Load CSV in Spark with types in non standard format 【发布时间】:2018-04-17 03:27:42 【问题描述】:

我有一个我想用 Spark 读取的 csv 文件,指定一个模式来获取我需要的类型。类似的东西:

Dataset<Row> ds = sqlContext.read()
    .format("csv")
    .option("header", "false")
    .schema(customSchema)
    .load("myCsvFilePath.csv");

但在我的 csv 文件中,某些列是以非标准方式记录的,例如双精度值使用逗号作为小数分隔符,或者日期时间值是格式为 dd.MM.yyyy 的字符串。 是否可以定义这样的模式?或者我应该将这些列作为字符串读取,然后显式解析它们?

【问题讨论】:

我建议在喂给 scala 之前进行文件清理。这很容易,并且会确保文件中没有垃圾分隔符。 请提供一些数据样本 "10655",31.10.2017,"851869631,35","906126633,45","473","860" "10425",25.03.2017,"1184646465,14"," 2090611791,58","13467","37114" 我会按照@ArmonRotemGalOz 的建议去做。将所有内容读取为字符串并修复它。不幸的是,您不能在阅读时应用部分模式。我们都经历过这个。 【参考方案1】:

将奇数格式转换为标准格式是您希望使用 spark 的 dataprep 管道的一部分 - 因此,可以将这些列作为字符串读取,然后使用内置函数或 udf,您可以将列替换为固定格式(例如使用 withColumn)

import org.apache.spark.sql.functions._ 

df.withColumn("fixed_date",unix_timestamp(col("date_column"),"dd.MM.YYYY")).withColumn("fixed_double",regexp_replace(col("double_column"),",",".").cast("double"))

【讨论】:

spark中是否有内置函数可以解析Date和Double?或者我应该提供 udf 包装 SimpleDateFormat / DecimalFormat? import org.apache.spark.sql.functions._ 你可以使用 unix_timestamp 转换日期和 regex_replace 删除逗号,然后转换为 double 来转换双打。

以上是关于在 Spark 中加载非标准格式的 CSV的主要内容,如果未能解决你的问题,请参考以下文章

使用 sparklyr 时无法在本地 Spark 连接中加载 .csv 数据

无法在 Spark 中将 CSV 文件加载为数据框

如何使用 Spark 加载 JSON(保存在 csv 中的路径)?

如何使用 Spark 数据帧将 csv 数据加载到配置单元中?

使用scala在sql表中加载csv文件

谁在 Apache Spark 中将分区加载到 RAM 中?