如何以自定义格式加载带有时间戳的 CSV?

Posted

技术标签:

【中文标题】如何以自定义格式加载带有时间戳的 CSV?【英文标题】:How to load CSVs with timestamps in custom format? 【发布时间】:2017-04-06 15:28:09 【问题描述】:

我在 csv 文件中有一个时间戳字段,我使用 spark csv 库将其加载到数据帧中。同一段代码在我的本地计算机上运行 Spark 2.0 版本,但在 Azure Hortonworks HDP 3.5 和 3.6 上引发错误。

我检查过,Azure HDInsight 3.5 也使用相同的 Spark 版本,所以我认为这不是 Spark 版本的问题。

import org.apache.spark.sql.types._
val sourceFile = "C:\\2017\\datetest"
val sourceSchemaStruct = new StructType()
  .add("EventDate",DataTypes.TimestampType)
  .add("Name",DataTypes.StringType)
val df = spark.read
  .format("com.databricks.spark.csv")
  .option("header","true")
  .option("delimiter","|")
  .option("mode","FAILFAST")
  .option("inferSchema","false")
  .option("dateFormat","yyyy/MM/dd HH:mm:ss.SSS")
  .schema(sourceSchemaStruct)
  .load(sourceFile)

整个异常如下:

Caused by: java.lang.IllegalArgumentException: Timestamp format must be yyyy-mm-dd hh:mm:ss[.fffffffff]
  at java.sql.Timestamp.valueOf(Timestamp.java:237)
  at org.apache.spark.sql.catalyst.util.DateTimeUtils$.stringToTime(DateTimeUtils.scala:179)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$13$$anonfun$apply$2.apply$mcJ$sp(UnivocityParser.scala:142)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$13$$anonfun$apply$2.apply(UnivocityParser.scala:142)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$13$$anonfun$apply$2.apply(UnivocityParser.scala:142)
  at scala.util.Try.getOrElse(Try.scala:79)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$13.apply(UnivocityParser.scala:139)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$13.apply(UnivocityParser.scala:135)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser.org$apache$spark$sql$execution$datasources$csv$UnivocityParser$$nullSafeDatum(UnivocityParser.scala:179)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9.apply(UnivocityParser.scala:135)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9.apply(UnivocityParser.scala:134)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser.org$apache$spark$sql$execution$datasources$csv$UnivocityParser$$convert(UnivocityParser.scala:215)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser.parse(UnivocityParser.scala:187)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$5.apply(UnivocityParser.scala:304)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$5.apply(UnivocityParser.scala:304)
  at org.apache.spark.sql.execution.datasources.FailureSafeParser.parse(FailureSafeParser.scala:61)
  ... 27 more

csv文件只有一行如下:

"EventDate"|"Name"
"2016/12/19 00:43:27.583"|"adam"

【问题讨论】:

【参考方案1】:

TL;DR 使用timestampFormat 选项(不是dateFormat)。


我已经成功地在最新的 Spark 版本 2.3.0-SNAPSHOT 中重现了它(由 master 构建)。

// OS shell
$ cat so-43259485.csv
"EventDate"|"Name"
"2016/12/19 00:43:27.583"|"adam"

// spark-shell
scala> spark.version
res1: String = 2.3.0-SNAPSHOT

case class Event(EventDate: java.sql.Timestamp, Name: String)
import org.apache.spark.sql.Encoders
val schema = Encoders.product[Event].schema

scala> spark
  .read
  .format("csv")
  .option("header", true)
  .option("mode","FAILFAST")
  .option("delimiter","|")
  .schema(schema)
  .load("so-43259485.csv")
  .show(false)
17/04/08 11:03:42 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID 7)
java.lang.IllegalArgumentException: Timestamp format must be yyyy-mm-dd hh:mm:ss[.fffffffff]
    at java.sql.Timestamp.valueOf(Timestamp.java:237)
    at org.apache.spark.sql.catalyst.util.DateTimeUtils$.stringToTime(DateTimeUtils.scala:167)
    at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$17$$anonfun$apply$6.apply$mcJ$sp(UnivocityParser.scala:146)
    at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$17$$anonfun$apply$6.apply(UnivocityParser.scala:146)
    at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$17$$anonfun$apply$6.apply(UnivocityParser.scala:146)
    at scala.util.Try.getOrElse(Try.scala:79)

corresponding line in the Spark sources 是问题的“根本原因”:

Timestamp.valueOf(s)

阅读了javadoc of Timestamp.valueOf,您可以了解到参数应该是:

yyyy-[m]m-[d]d hh:mm:ss[.f...] 格式的时间戳。小数秒可以省略。 mm 和 dd 的前导零也可以省略。

注意“小数秒可能会被省略”,所以让我们先将 EventDate 作为字符串加载,然后仅在删除不需要的小数秒后将其转换为时间戳来切断它。

val eventsAsString = spark.read.format("csv")
  .option("header", true)
  .option("mode","FAILFAST")
  .option("delimiter","|")
  .load("so-43259485.csv")

事实证明,for fields of TimestampType type Spark uses timestampFormat option 首先如果定义且仅在未定义时使用 the code the uses Timestamp.valueOf

事实证明,解决方法只是使用timestampFormat 选项(而不是dateFormat!)。

val df = spark.read
  .format("com.databricks.spark.csv")
  .option("header","true")
  .option("delimiter","|")
  .option("mode","FAILFAST")
  .option("inferSchema","false")
  .option("timestampFormat","yyyy/MM/dd HH:mm:ss.SSS")
  .schema(sourceSchemaStruct)
  .load(sourceFile)
scala> df.show(false)
+-----------------------+----+
|EventDate              |Name|
+-----------------------+----+
|2016-12-19 00:43:27.583|adam|
+-----------------------+----+

火花 2.1.0

在 CSV 中使用 inferSchema 选项和您的自定义 timestampFormat 进行架构推断。

使用inferSchema 触发架构推断以使timestampFormat 生效非常重要。

val events = spark.read
  .format("csv")
  .option("header", true)
  .option("mode","FAILFAST")
  .option("delimiter","|")
  .option("inferSchema", true)
  .option("timestampFormat", "yyyy/MM/dd HH:mm:ss")
  .load("so-43259485.csv")

scala> events.show(false)
+-------------------+----+
|EventDate          |Name|
+-------------------+----+
|2016-12-19 00:43:27|adam|
+-------------------+----+

scala> events.printSchema
root
 |-- EventDate: timestamp (nullable = true)
 |-- Name: string (nullable = true)

“不正确”初始版本留作学习之用

val events = eventsAsString
  .withColumn("date", split($"EventDate", " ")(0))
  .withColumn("date", translate($"date", "/", "-"))
  .withColumn("time", split($"EventDate", " ")(1))
  .withColumn("time", split($"time", "[.]")(0))    // <-- remove millis part
  .withColumn("EventDate", concat($"date", lit(" "), $"time")) // <-- make EventDate right
  .select($"EventDate" cast "timestamp", $"Name")

scala> events.printSchema
root
 |-- EventDate: timestamp (nullable = true)
 |-- Name: string (nullable = true)
    events.show(false)

scala> events.show
+-------------------+----+
|          EventDate|Name|
+-------------------+----+
|2016-12-19 00:43:27|adam|
+-------------------+----+

火花 2.2.0

从 Spark 2.2 开始,您可以使用 to_timestamp 函数进行字符串到时间戳的转换。

eventsAsString.select($"EventDate", to_timestamp($"EventDate", "yyyy/MM/dd HH:mm:ss.SSS")).show(false)

scala> eventsAsString.select($"EventDate", to_timestamp($"EventDate", "yyyy/MM/dd HH:mm:ss.SSS")).show(false)
+-----------------------+----------------------------------------------------+
|EventDate              |to_timestamp(`EventDate`, 'yyyy/MM/dd HH:mm:ss.SSS')|
+-----------------------+----------------------------------------------------+
|2016/12/19 00:43:27.583|2016-12-19 00:43:27                                 |
+-----------------------+----------------------------------------------------+

【讨论】:

任何帮助请***.com/questions/55965978/… 如果我们想在同一个文件中解析多个时间戳格式怎么办,例如,在我的 csv 文件中,我有一些时间戳,如“dd/MM/yyyy”和“dd-mm-yyyy”我希望能够将两者都解析为时间戳,我尝试了这个 .Option("TimeStampFormat", "dd/MM/yyyy, dd-MM-yyyy") 但它不起作用 @Pugnatore 不适用于不一致的时间戳格式。您必须创建一个 UDF 才能自己进行解析。【参考方案2】:

我搜索了这个问题,发现了官方 Github 问题页面https://github.com/databricks/spark-csv/pull/280,它修复了一个相关的错误,用于解析具有自定义日期格式的数据。我查看了一些源代码,并根据code找出您的问题原因,设置为inferSchema,默认值为false,如下所示。

inferSchema:自动推断列类型。它需要对数据进行一次额外的传递,并且默认为 false

请将inferSchema 更改为true 为您的日期格式yyyy/MM/dd HH:mm:ss.SSS 使用SimpleDateFormat

【讨论】:

我修改了我的代码如下并运行它。它仍然产生相同的错误 val df = spark.read.format("com.databricks.spark.csv").option("header","true").option("delimiter","|").option ("mode","FAILFAST").option("inferSchema","true").option("dateFormat","yyyy/MM/dd HH:mm:ss.SSS").schema(sourceSchemaStruct).load(源文件)

以上是关于如何以自定义格式加载带有时间戳的 CSV?的主要内容,如果未能解决你的问题,请参考以下文章

如何以自定义 json 格式返回数据?

使用 bq 将数据加载到 BigQuery 的自定义日期格式?

Ninja Framework 以自定义格式返回 JSON

如何以自定义漂亮的日志格式在每个新行上附加制表符?

如何强制Python XlsxWriter以自定义格式写入单元格

从 QTime 继承以自定义时间格式