在 Spark 中加载非标准格式的 CSV
Posted
技术标签:
【中文标题】在 Spark 中加载非标准格式的 CSV【英文标题】:Load CSV in Spark with types in non standard format 【发布时间】:2018-04-17 03:27:42 【问题描述】:我有一个我想用 Spark 读取的 csv 文件,指定一个模式来获取我需要的类型。类似的东西:
Dataset<Row> ds = sqlContext.read()
.format("csv")
.option("header", "false")
.schema(customSchema)
.load("myCsvFilePath.csv");
但在我的 csv 文件中,某些列是以非标准方式记录的,例如双精度值使用逗号作为小数分隔符,或者日期时间值是格式为 dd.MM.yyyy 的字符串。 是否可以定义这样的模式?或者我应该将这些列作为字符串读取,然后显式解析它们?
【问题讨论】:
我建议在喂给 scala 之前进行文件清理。这很容易,并且会确保文件中没有垃圾分隔符。 请提供一些数据样本 "10655",31.10.2017,"851869631,35","906126633,45","473","860" "10425",25.03.2017,"1184646465,14"," 2090611791,58","13467","37114" 我会按照@ArmonRotemGalOz 的建议去做。将所有内容读取为字符串并修复它。不幸的是,您不能在阅读时应用部分模式。我们都经历过这个。 【参考方案1】:将奇数格式转换为标准格式是您希望使用 spark 的 dataprep 管道的一部分 - 因此,可以将这些列作为字符串读取,然后使用内置函数或 udf,您可以将列替换为固定格式(例如使用 withColumn)
import org.apache.spark.sql.functions._
df.withColumn("fixed_date",unix_timestamp(col("date_column"),"dd.MM.YYYY")).withColumn("fixed_double",regexp_replace(col("double_column"),",",".").cast("double"))
【讨论】:
spark中是否有内置函数可以解析Date和Double?或者我应该提供 udf 包装 SimpleDateFormat / DecimalFormat? import org.apache.spark.sql.functions._ 你可以使用 unix_timestamp 转换日期和 regex_replace 删除逗号,然后转换为 double 来转换双打。以上是关于在 Spark 中加载非标准格式的 CSV的主要内容,如果未能解决你的问题,请参考以下文章
使用 sparklyr 时无法在本地 Spark 连接中加载 .csv 数据
如何使用 Spark 加载 JSON(保存在 csv 中的路径)?