如何以自定义格式加载带有时间戳的 CSV?
Posted
技术标签:
【中文标题】如何以自定义格式加载带有时间戳的 CSV?【英文标题】:How to load CSVs with timestamps in custom format? 【发布时间】:2017-04-06 15:28:09 【问题描述】:我在 csv 文件中有一个时间戳字段,我使用 spark csv 库将其加载到数据帧中。同一段代码在我的本地计算机上运行 Spark 2.0 版本,但在 Azure Hortonworks HDP 3.5 和 3.6 上引发错误。
我检查过,Azure HDInsight 3.5 也使用相同的 Spark 版本,所以我认为这不是 Spark 版本的问题。
import org.apache.spark.sql.types._
val sourceFile = "C:\\2017\\datetest"
val sourceSchemaStruct = new StructType()
.add("EventDate",DataTypes.TimestampType)
.add("Name",DataTypes.StringType)
val df = spark.read
.format("com.databricks.spark.csv")
.option("header","true")
.option("delimiter","|")
.option("mode","FAILFAST")
.option("inferSchema","false")
.option("dateFormat","yyyy/MM/dd HH:mm:ss.SSS")
.schema(sourceSchemaStruct)
.load(sourceFile)
整个异常如下:
Caused by: java.lang.IllegalArgumentException: Timestamp format must be yyyy-mm-dd hh:mm:ss[.fffffffff]
at java.sql.Timestamp.valueOf(Timestamp.java:237)
at org.apache.spark.sql.catalyst.util.DateTimeUtils$.stringToTime(DateTimeUtils.scala:179)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$13$$anonfun$apply$2.apply$mcJ$sp(UnivocityParser.scala:142)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$13$$anonfun$apply$2.apply(UnivocityParser.scala:142)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$13$$anonfun$apply$2.apply(UnivocityParser.scala:142)
at scala.util.Try.getOrElse(Try.scala:79)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$13.apply(UnivocityParser.scala:139)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$13.apply(UnivocityParser.scala:135)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser.org$apache$spark$sql$execution$datasources$csv$UnivocityParser$$nullSafeDatum(UnivocityParser.scala:179)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9.apply(UnivocityParser.scala:135)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9.apply(UnivocityParser.scala:134)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser.org$apache$spark$sql$execution$datasources$csv$UnivocityParser$$convert(UnivocityParser.scala:215)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser.parse(UnivocityParser.scala:187)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$5.apply(UnivocityParser.scala:304)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$5.apply(UnivocityParser.scala:304)
at org.apache.spark.sql.execution.datasources.FailureSafeParser.parse(FailureSafeParser.scala:61)
... 27 more
csv文件只有一行如下:
"EventDate"|"Name"
"2016/12/19 00:43:27.583"|"adam"
【问题讨论】:
【参考方案1】:TL;DR 使用timestampFormat
选项(不是dateFormat
)。
我已经成功地在最新的 Spark 版本 2.3.0-SNAPSHOT 中重现了它(由 master 构建)。
// OS shell
$ cat so-43259485.csv
"EventDate"|"Name"
"2016/12/19 00:43:27.583"|"adam"
// spark-shell
scala> spark.version
res1: String = 2.3.0-SNAPSHOT
case class Event(EventDate: java.sql.Timestamp, Name: String)
import org.apache.spark.sql.Encoders
val schema = Encoders.product[Event].schema
scala> spark
.read
.format("csv")
.option("header", true)
.option("mode","FAILFAST")
.option("delimiter","|")
.schema(schema)
.load("so-43259485.csv")
.show(false)
17/04/08 11:03:42 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID 7)
java.lang.IllegalArgumentException: Timestamp format must be yyyy-mm-dd hh:mm:ss[.fffffffff]
at java.sql.Timestamp.valueOf(Timestamp.java:237)
at org.apache.spark.sql.catalyst.util.DateTimeUtils$.stringToTime(DateTimeUtils.scala:167)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$17$$anonfun$apply$6.apply$mcJ$sp(UnivocityParser.scala:146)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$17$$anonfun$apply$6.apply(UnivocityParser.scala:146)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$17$$anonfun$apply$6.apply(UnivocityParser.scala:146)
at scala.util.Try.getOrElse(Try.scala:79)
corresponding line in the Spark sources 是问题的“根本原因”:
Timestamp.valueOf(s)
阅读了javadoc of Timestamp.valueOf,您可以了解到参数应该是:
yyyy-[m]m-[d]d hh:mm:ss[.f...]
格式的时间戳。小数秒可以省略。 mm 和 dd 的前导零也可以省略。
注意“小数秒可能会被省略”,所以让我们先将 EventDate 作为字符串加载,然后仅在删除不需要的小数秒后将其转换为时间戳来切断它。
val eventsAsString = spark.read.format("csv")
.option("header", true)
.option("mode","FAILFAST")
.option("delimiter","|")
.load("so-43259485.csv")
事实证明,for fields of TimestampType
type Spark uses timestampFormat
option 首先如果定义且仅在未定义时使用 the code the uses Timestamp.valueOf
。
事实证明,解决方法只是使用timestampFormat
选项(而不是dateFormat
!)。
val df = spark.read
.format("com.databricks.spark.csv")
.option("header","true")
.option("delimiter","|")
.option("mode","FAILFAST")
.option("inferSchema","false")
.option("timestampFormat","yyyy/MM/dd HH:mm:ss.SSS")
.schema(sourceSchemaStruct)
.load(sourceFile)
scala> df.show(false)
+-----------------------+----+
|EventDate |Name|
+-----------------------+----+
|2016-12-19 00:43:27.583|adam|
+-----------------------+----+
火花 2.1.0
在 CSV 中使用 inferSchema
选项和您的自定义 timestampFormat
进行架构推断。
使用inferSchema
触发架构推断以使timestampFormat
生效非常重要。
val events = spark.read
.format("csv")
.option("header", true)
.option("mode","FAILFAST")
.option("delimiter","|")
.option("inferSchema", true)
.option("timestampFormat", "yyyy/MM/dd HH:mm:ss")
.load("so-43259485.csv")
scala> events.show(false)
+-------------------+----+
|EventDate |Name|
+-------------------+----+
|2016-12-19 00:43:27|adam|
+-------------------+----+
scala> events.printSchema
root
|-- EventDate: timestamp (nullable = true)
|-- Name: string (nullable = true)
“不正确”初始版本留作学习之用
val events = eventsAsString
.withColumn("date", split($"EventDate", " ")(0))
.withColumn("date", translate($"date", "/", "-"))
.withColumn("time", split($"EventDate", " ")(1))
.withColumn("time", split($"time", "[.]")(0)) // <-- remove millis part
.withColumn("EventDate", concat($"date", lit(" "), $"time")) // <-- make EventDate right
.select($"EventDate" cast "timestamp", $"Name")
scala> events.printSchema
root
|-- EventDate: timestamp (nullable = true)
|-- Name: string (nullable = true)
events.show(false)
scala> events.show
+-------------------+----+
| EventDate|Name|
+-------------------+----+
|2016-12-19 00:43:27|adam|
+-------------------+----+
火花 2.2.0
从 Spark 2.2 开始,您可以使用 to_timestamp
函数进行字符串到时间戳的转换。
eventsAsString.select($"EventDate", to_timestamp($"EventDate", "yyyy/MM/dd HH:mm:ss.SSS")).show(false)
scala> eventsAsString.select($"EventDate", to_timestamp($"EventDate", "yyyy/MM/dd HH:mm:ss.SSS")).show(false)
+-----------------------+----------------------------------------------------+
|EventDate |to_timestamp(`EventDate`, 'yyyy/MM/dd HH:mm:ss.SSS')|
+-----------------------+----------------------------------------------------+
|2016/12/19 00:43:27.583|2016-12-19 00:43:27 |
+-----------------------+----------------------------------------------------+
【讨论】:
任何帮助请***.com/questions/55965978/… 如果我们想在同一个文件中解析多个时间戳格式怎么办,例如,在我的 csv 文件中,我有一些时间戳,如“dd/MM/yyyy”和“dd-mm-yyyy”我希望能够将两者都解析为时间戳,我尝试了这个 .Option("TimeStampFormat", "dd/MM/yyyy, dd-MM-yyyy") 但它不起作用 @Pugnatore 不适用于不一致的时间戳格式。您必须创建一个 UDF 才能自己进行解析。【参考方案2】:我搜索了这个问题,发现了官方 Github 问题页面https://github.com/databricks/spark-csv/pull/280,它修复了一个相关的错误,用于解析具有自定义日期格式的数据。我查看了一些源代码,并根据code找出您的问题原因,设置为inferSchema
,默认值为false
,如下所示。
inferSchema
:自动推断列类型。它需要对数据进行一次额外的传递,并且默认为 false
请将inferSchema
更改为true
为您的日期格式yyyy/MM/dd HH:mm:ss.SSS
使用SimpleDateFormat
。
【讨论】:
我修改了我的代码如下并运行它。它仍然产生相同的错误 val df = spark.read.format("com.databricks.spark.csv").option("header","true").option("delimiter","|").option ("mode","FAILFAST").option("inferSchema","true").option("dateFormat","yyyy/MM/dd HH:mm:ss.SSS").schema(sourceSchemaStruct).load(源文件)以上是关于如何以自定义格式加载带有时间戳的 CSV?的主要内容,如果未能解决你的问题,请参考以下文章
使用 bq 将数据加载到 BigQuery 的自定义日期格式?