在 Spark 中将字符串字段转换为时间戳的更好方法

Posted

技术标签:

【中文标题】在 Spark 中将字符串字段转换为时间戳的更好方法【英文标题】:Better way to convert a string field into timestamp in Spark 【发布时间】:2015-04-24 09:45:27 【问题描述】:

我有一个 CSV,其中一个字段是特定格式的日期时间。我不能直接在我的 Dataframe 中导入它,因为它需要是一个时间戳。所以我将它作为字符串导入并像这样将其转换为Timestamp

import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.spark.sql.Row

def getTimestamp(x:Any) : Timestamp = 
    val format = new SimpleDateFormat("MM/dd/yyyy' 'HH:mm:ss")
    if (x.toString() == "") 
    return null
    else 
        val d = format.parse(x.toString());
        val t = new Timestamp(d.getTime());
        return t
    


def convert(row : Row) : Row = 
    val d1 = getTimestamp(row(3))
    return Row(row(0),row(1),row(2),d1)

使用 Dataframe API 或 spark-sql 是否有更好、更简洁的方法来执行此操作?上述方法需要创建一个 RDD 并再次为 Dataframe 提供架构。

【问题讨论】:

【参考方案1】:

火花 >= 2.2

从你2.2开始可以直接提供格式字符串:

import org.apache.spark.sql.functions.to_timestamp

val ts = to_timestamp($"dts", "MM/dd/yyyy HH:mm:ss")

df.withColumn("ts", ts).show(2, false)

// +---+-------------------+-------------------+
// |id |dts                |ts                 |
// +---+-------------------+-------------------+
// |1  |05/26/2016 01:01:01|2016-05-26 01:01:01|
// |2  |#$@#@#             |null               |
// +---+-------------------+-------------------+

火花 >= 1.6,

您可以使用 Spark 1.5 中引入的日期处理功能。假设您有以下数据:

val df = Seq((1L, "05/26/2016 01:01:01"), (2L, "#$@#@#")).toDF("id", "dts")

您可以使用unix_timestamp 解析字符串并将其转换为时间戳

import org.apache.spark.sql.functions.unix_timestamp

val ts = unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss").cast("timestamp")

df.withColumn("ts", ts).show(2, false)

// +---+-------------------+---------------------+
// |id |dts                |ts                   |
// +---+-------------------+---------------------+
// |1  |05/26/2016 01:01:01|2016-05-26 01:01:01.0|
// |2  |#$@#@#             |null                 |
// +---+-------------------+---------------------+

如您所见,它涵盖了解析和错误处理。格式字符串应与 Java SimpleDateFormat 兼容。

火花 >= 1.5,

你必须使用这样的东西:

unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss").cast("double").cast("timestamp")

(unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss") * 1000).cast("timestamp")

由于SPARK-11724。

火花

您应该可以将它们与exprHiveContext 一起使用。

【讨论】:

to_timestamp 似乎被设计为出于某种原因丢弃毫秒信息。 @user2357112supportsMonica:是的,你是对的。可能我们必须用 java SimpleDateFormat 编写自定义 udf 来实现这一点。在某些情况下,来自 spark 函数的 date_formatdate_format 很方便。【参考方案2】:

我还没有玩过 Spark SQL,但我认为这将是更惯用的 scala(null 使用不被认为是一种好的做法):

def getTimestamp(s: String) : Option[Timestamp] = s match 
  case "" => None
  case _ => 
    val format = new SimpleDateFormat("MM/dd/yyyy' 'HH:mm:ss")
    Try(new Timestamp(format.parse(s).getTime)) match 
      case Success(t) => Some(t)
      case Failure(_) => None
        
  

请注意,我假设您事先知道 Row 元素类型(如果您从 csv 文件中读取,它们都是 String),这就是为什么我使用像 String 这样的正确类型而不是 Any(一切都是Any的子类型。

这还取决于您希望如何处理解析异常。在这种情况下,如果发生解析异常,则简单地返回一个None

您可以进一步使用它:

rows.map(row => Row(row(0),row(1),row(2), getTimestamp(row(3))

【讨论】:

我以前做过这个。我觉得我应该先解决核心问题,然后再讨论这些细节。如果有更好的解决方案,可能根本不必这样做。问题在于返回 rdd 并且需要转换为 ddf 的 rows.map。所以可能是缺少 ddf api 或者我不知道该怎么做。 我不知道是否有其他方法,但是您可以毫无问题地将任何 RDD 转换为 DF。在这个带有sqlContext.createDataFrame(rowRDD, schema) 的具体示例中。对我来说,spark sql 很适合以类似 SQL 的方式查询数据,而不是解析数据本身(对于这样的事情,使用简单的 RDD)。 Try(new Timestamp(format.parse(s).getTime)).toOption【参考方案3】:

我的数据集中有 ISO8601 时间戳,我需要将其转换为“yyyy-MM-dd”格式。这就是我所做的:

import org.joda.time.DateTime, DateTimeZone
object DateUtils extends Serializable 
  def dtFromUtcSeconds(seconds: Int): DateTime = new DateTime(seconds * 1000L, DateTimeZone.UTC)
  def dtFromIso8601(isoString: String): DateTime = new DateTime(isoString, DateTimeZone.UTC)


sqlContext.udf.register("formatTimeStamp", (isoTimestamp : String) => DateUtils.dtFromIso8601(isoTimestamp).toString("yyyy-MM-dd"))

您可以在 Spark SQL 查询中使用 UDF。

【讨论】:

【参考方案4】:

Spark 版本:2.4.4

scala> import org.apache.spark.sql.types.TimestampType
import org.apache.spark.sql.types.TimestampType

scala> val df = Seq("2019-04-01 08:28:00").toDF("ts")
df: org.apache.spark.sql.DataFrame = [ts: string]

scala> val df_mod = df.select($"ts".cast(TimestampType))
df_mod: org.apache.spark.sql.DataFrame = [ts: timestamp]

scala> df_mod.printSchema()
root
 |-- ts: timestamp (nullable = true)

【讨论】:

【参考方案5】:

我想把你写的getTimeStamp方法移到rdd的mapPartitions中,在迭代器的行间复用GenericMutableRow:

val strRdd = sc.textFile("hdfs://path/to/cvs-file")
val rowRdd: RDD[Row] = strRdd.map(_.split('\t')).mapPartitions  iter =>
  new Iterator[Row] 
    val row = new GenericMutableRow(4)
    var current: Array[String] = _

    def hasNext = iter.hasNext
    def next() = 
      current = iter.next()
      row(0) = current(0)
      row(1) = current(1)
      row(2) = current(2)

      val ts = getTimestamp(current(3))
      if(ts != null) 
        row.update(3, ts)
       else 
        row.setNullAt(3)
      
      row
    
  

而且你还是应该使用 schema 来生成 DataFrame

val df = sqlContext.createDataFrame(rowRdd, tableSchema)

在迭代器实现中使用 GenericMutableRow 可以在 Aggregate Operator、InMemoryColumnarTableScan、ParquetTableOperations 等中找到。

【讨论】:

非常接近我的实际代码。此外,如果您想解析 csv 文件,您可能应该使用 spark-csv 而不是 split。我想说的一点是添加和变异列将返回一个 rdd ,这将再次需要通过提供模式转换为 ddf。有没有更短的路线。 @user568109,我认为没有。由于 spark-sql 需要一个模式,它必须以某种方式获得一个。如果您使用 RDD[CaseClassX],spark-sql 会根据案例类的定义自动为您推断架构。但是你在这里使用的是一个 Row(Array[Any]),没有 DataType 推断可以去那里,所以你只传递一个。 我认为使用一个引用,每次都对其进行变异并将其作为引用返回是灾难的根源。你真的成功地使用了这种方法吗? @YijieShen 我的立场是正确的。这个“mutableRow”看起来像这里解释的内存优化:github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/… 你能指出他们推荐这种方法的资源吗?看起来像是凯利板想要在幕后做的事情。 +1【参考方案6】:

我会使用https://github.com/databricks/spark-csv

这将为您推断时间戳。

import com.databricks.spark.csv._
val rdd: RDD[String] = sc.textFile("csvfile.csv")

val df : DataFrame = new CsvParser().withDelimiter('|')
      .withInferSchema(true)
      .withParseMode("DROPMALFORMED")
      .csvRdd(sqlContext, rdd)

【讨论】:

【参考方案7】:

我遇到了一些关于 to_timestamp 的问题,它返回一个空字符串。经过大量的试验和错误,我能够通过转换为时间戳来绕过它,然后再转换为字符串。我希望这对遇到相同问题的其他人有所帮助:

df.columns.intersect(cols).foldLeft(df)((newDf, col) => 
  val conversionFunc = to_timestamp(newDf(col).cast("timestamp"), "MM/dd/yyyy HH:mm:ss").cast("string")
  newDf.withColumn(col, conversionFunc)
)

【讨论】:

以上是关于在 Spark 中将字符串字段转换为时间戳的更好方法的主要内容,如果未能解决你的问题,请参考以下文章

php怎么将指定日期转换为时间戳

在scala中将时间字符串转换为时间戳/日期时间

在 Hive 中将 Long 转换为时间戳

在 Hive 中将字符串转换为时间戳

在 Impala 中将连接的字符串转换为时间戳

在Android中将字符串日期转换为时间戳?