在 Spark 中将字符串字段转换为时间戳的更好方法
Posted
技术标签:
【中文标题】在 Spark 中将字符串字段转换为时间戳的更好方法【英文标题】:Better way to convert a string field into timestamp in Spark 【发布时间】:2015-04-24 09:45:27 【问题描述】:我有一个 CSV,其中一个字段是特定格式的日期时间。我不能直接在我的 Dataframe 中导入它,因为它需要是一个时间戳。所以我将它作为字符串导入并像这样将其转换为Timestamp
import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.spark.sql.Row
def getTimestamp(x:Any) : Timestamp =
val format = new SimpleDateFormat("MM/dd/yyyy' 'HH:mm:ss")
if (x.toString() == "")
return null
else
val d = format.parse(x.toString());
val t = new Timestamp(d.getTime());
return t
def convert(row : Row) : Row =
val d1 = getTimestamp(row(3))
return Row(row(0),row(1),row(2),d1)
使用 Dataframe API 或 spark-sql 是否有更好、更简洁的方法来执行此操作?上述方法需要创建一个 RDD 并再次为 Dataframe 提供架构。
【问题讨论】:
【参考方案1】:火花 >= 2.2
从你2.2开始可以直接提供格式字符串:
import org.apache.spark.sql.functions.to_timestamp
val ts = to_timestamp($"dts", "MM/dd/yyyy HH:mm:ss")
df.withColumn("ts", ts).show(2, false)
// +---+-------------------+-------------------+
// |id |dts |ts |
// +---+-------------------+-------------------+
// |1 |05/26/2016 01:01:01|2016-05-26 01:01:01|
// |2 |#$@#@# |null |
// +---+-------------------+-------------------+
火花 >= 1.6,
您可以使用 Spark 1.5 中引入的日期处理功能。假设您有以下数据:
val df = Seq((1L, "05/26/2016 01:01:01"), (2L, "#$@#@#")).toDF("id", "dts")
您可以使用unix_timestamp
解析字符串并将其转换为时间戳
import org.apache.spark.sql.functions.unix_timestamp
val ts = unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss").cast("timestamp")
df.withColumn("ts", ts).show(2, false)
// +---+-------------------+---------------------+
// |id |dts |ts |
// +---+-------------------+---------------------+
// |1 |05/26/2016 01:01:01|2016-05-26 01:01:01.0|
// |2 |#$@#@# |null |
// +---+-------------------+---------------------+
如您所见,它涵盖了解析和错误处理。格式字符串应与 Java SimpleDateFormat
兼容。
火花 >= 1.5,
你必须使用这样的东西:
unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss").cast("double").cast("timestamp")
或
(unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss") * 1000).cast("timestamp")
由于SPARK-11724。
火花
您应该可以将它们与expr
和HiveContext
一起使用。
【讨论】:
to_timestamp
似乎被设计为出于某种原因丢弃毫秒信息。
@user2357112supportsMonica:是的,你是对的。可能我们必须用 java SimpleDateFormat
编写自定义 udf 来实现这一点。在某些情况下,来自 spark 函数的 date_format
或 date_format
很方便。【参考方案2】:
我还没有玩过 Spark SQL,但我认为这将是更惯用的 scala(null 使用不被认为是一种好的做法):
def getTimestamp(s: String) : Option[Timestamp] = s match
case "" => None
case _ =>
val format = new SimpleDateFormat("MM/dd/yyyy' 'HH:mm:ss")
Try(new Timestamp(format.parse(s).getTime)) match
case Success(t) => Some(t)
case Failure(_) => None
请注意,我假设您事先知道 Row
元素类型(如果您从 csv 文件中读取,它们都是 String
),这就是为什么我使用像 String
这样的正确类型而不是 Any
(一切都是Any
的子类型。
这还取决于您希望如何处理解析异常。在这种情况下,如果发生解析异常,则简单地返回一个None
。
您可以进一步使用它:
rows.map(row => Row(row(0),row(1),row(2), getTimestamp(row(3))
【讨论】:
我以前做过这个。我觉得我应该先解决核心问题,然后再讨论这些细节。如果有更好的解决方案,可能根本不必这样做。问题在于返回 rdd 并且需要转换为 ddf 的 rows.map。所以可能是缺少 ddf api 或者我不知道该怎么做。 我不知道是否有其他方法,但是您可以毫无问题地将任何 RDD 转换为 DF。在这个带有sqlContext.createDataFrame(rowRDD, schema)
的具体示例中。对我来说,spark sql 很适合以类似 SQL 的方式查询数据,而不是解析数据本身(对于这样的事情,使用简单的 RDD)。
Try(new Timestamp(format.parse(s).getTime)).toOption【参考方案3】:
我的数据集中有 ISO8601 时间戳,我需要将其转换为“yyyy-MM-dd”格式。这就是我所做的:
import org.joda.time.DateTime, DateTimeZone
object DateUtils extends Serializable
def dtFromUtcSeconds(seconds: Int): DateTime = new DateTime(seconds * 1000L, DateTimeZone.UTC)
def dtFromIso8601(isoString: String): DateTime = new DateTime(isoString, DateTimeZone.UTC)
sqlContext.udf.register("formatTimeStamp", (isoTimestamp : String) => DateUtils.dtFromIso8601(isoTimestamp).toString("yyyy-MM-dd"))
您可以在 Spark SQL 查询中使用 UDF。
【讨论】:
【参考方案4】:Spark 版本:2.4.4
scala> import org.apache.spark.sql.types.TimestampType
import org.apache.spark.sql.types.TimestampType
scala> val df = Seq("2019-04-01 08:28:00").toDF("ts")
df: org.apache.spark.sql.DataFrame = [ts: string]
scala> val df_mod = df.select($"ts".cast(TimestampType))
df_mod: org.apache.spark.sql.DataFrame = [ts: timestamp]
scala> df_mod.printSchema()
root
|-- ts: timestamp (nullable = true)
【讨论】:
【参考方案5】:我想把你写的getTimeStamp
方法移到rdd的mapPartitions中,在迭代器的行间复用GenericMutableRow:
val strRdd = sc.textFile("hdfs://path/to/cvs-file")
val rowRdd: RDD[Row] = strRdd.map(_.split('\t')).mapPartitions iter =>
new Iterator[Row]
val row = new GenericMutableRow(4)
var current: Array[String] = _
def hasNext = iter.hasNext
def next() =
current = iter.next()
row(0) = current(0)
row(1) = current(1)
row(2) = current(2)
val ts = getTimestamp(current(3))
if(ts != null)
row.update(3, ts)
else
row.setNullAt(3)
row
而且你还是应该使用 schema 来生成 DataFrame
val df = sqlContext.createDataFrame(rowRdd, tableSchema)
在迭代器实现中使用 GenericMutableRow 可以在 Aggregate Operator、InMemoryColumnarTableScan、ParquetTableOperations 等中找到。
【讨论】:
非常接近我的实际代码。此外,如果您想解析 csv 文件,您可能应该使用 spark-csv 而不是 split。我想说的一点是添加和变异列将返回一个 rdd ,这将再次需要通过提供模式转换为 ddf。有没有更短的路线。 @user568109,我认为没有。由于 spark-sql 需要一个模式,它必须以某种方式获得一个。如果您使用 RDD[CaseClassX],spark-sql 会根据案例类的定义自动为您推断架构。但是你在这里使用的是一个 Row(Array[Any]),没有 DataType 推断可以去那里,所以你只传递一个。 我认为使用一个引用,每次都对其进行变异并将其作为引用返回是灾难的根源。你真的成功地使用了这种方法吗? @YijieShen 我的立场是正确的。这个“mutableRow”看起来像这里解释的内存优化:github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/… 你能指出他们推荐这种方法的资源吗?看起来像是凯利板想要在幕后做的事情。 +1【参考方案6】:我会使用https://github.com/databricks/spark-csv
这将为您推断时间戳。
import com.databricks.spark.csv._
val rdd: RDD[String] = sc.textFile("csvfile.csv")
val df : DataFrame = new CsvParser().withDelimiter('|')
.withInferSchema(true)
.withParseMode("DROPMALFORMED")
.csvRdd(sqlContext, rdd)
【讨论】:
【参考方案7】:我遇到了一些关于 to_timestamp 的问题,它返回一个空字符串。经过大量的试验和错误,我能够通过转换为时间戳来绕过它,然后再转换为字符串。我希望这对遇到相同问题的其他人有所帮助:
df.columns.intersect(cols).foldLeft(df)((newDf, col) =>
val conversionFunc = to_timestamp(newDf(col).cast("timestamp"), "MM/dd/yyyy HH:mm:ss").cast("string")
newDf.withColumn(col, conversionFunc)
)
【讨论】:
以上是关于在 Spark 中将字符串字段转换为时间戳的更好方法的主要内容,如果未能解决你的问题,请参考以下文章