spark scala比较具有时间戳列的数据帧
Posted
技术标签:
【中文标题】spark scala比较具有时间戳列的数据帧【英文标题】:spark scala compare dataframes having timestamp column 【发布时间】:2020-08-23 14:53:58 【问题描述】:我正在尝试比较 2 组数据。一个是数据帧,一组静态数据并以 Avro 格式写入。现在,这种比较从 Avro 读取并检查哪个具有时间戳列,并且比较失败,因为 Avro 将数据存储为 Long 并且 sql 类型转换给出了不同的值
**--CREATE THE DATAFRAME**
val data = Seq(Row("1",java.sql.Timestamp.valueOf("2019-03-15 18:20:06.456")))
val schemaOrig = List( StructField("rowkey",StringType,true)
,StructField("txn_ts",TimestampType,true))
val sourceDf = spark.createDataFrame(spark.sparkContext.parallelize(data),StructType(schemaOrig))
sourceDf.write.avro("test")
sourceDf.printSchema
root
|-- rowkey: string (nullable = true)
|-- txn_ts: timestamp (nullable = true)
sourceDf.show(false)
+----------------+-----------------------+
|rowkey |txn_ts |
+----------------+-----------------------+
|1 |2019-03-15 18:20:06.456|
+----------------+-----------------------+
--As shown above the avro file has the expected schema specified ie String and Timestamp
--Now Read the data back from Avro
val avroDf=spark.read.avro("test")
avroDf.printSchema
root
|-- rowkey: string (nullable = true)
|-- txn_ts: long (nullable = true)
avroDf.show(false)
--Avro Df schema is printing the timestamp field as long and data showing epoch time
+----------------+-------------+
|rowkey |txn_ts |
+----------------+-------------+
|1 |1552688406456|
+----------------+-------------+
compare the 2 Df
sourceDf.except(avroDf).show(false)
--Gives error due to datatype mismatch
org.apache.spark.sql.AnalysisException: Except can only be performed on tables with the compatible column types. bigint <> timestamp at the second column of the second table;;
'Except
:- AnalysisBarrier
CAST the avro data long field back to time
stamp
val modifiedAvroDf=avroDf.withColumn("txn_ts", col("txn_ts").cast(TimestampType))
modifiedAvroDf.printSchema
|-- rowkey: string (nullable = true)
|-- txn_ts: timestamp (nullable = true)
modifiedAvroDf.show(false)
--Showing wrong timestamp value
+----------------+-----------------------+
|rowkey |txn_ts |
+----------------+-----------------------+
|1 |51172-09-26 11:07:366.0|
+----------------+-----------------------+
--Now Try to cast the source column to long
val sourceModDf=sourceDf.withColumn("txn_ts",col("txn_ts").cast(LongType))
sourceModDf.printSchema
|-- rowkey: string (nullable = true)
|-- txn_ts: long (nullable = true)
sourceModDf.show(false)
sourceModDf.except(modifiedAvroDf).show(false)
【问题讨论】:
【参考方案1】:创建 UDF 以将 long 转换为时间戳字符串。请检查以下代码。
scala> val df = Seq(1552688406456L).toDF
df: org.apache.spark.sql.DataFrame = [value: bigint]
scala> import org.joda.time.DateTime
import org.joda.time.DateTime
scala> import org.joda.time.DateTimeZone
import org.joda.time.DateTimeZone
scala> val datetime = udf((date: Long) => new DateTime(date, DateTimeZone.UTC).toString.replace("Z","").replace("T"," "))
datetime: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(LongType)))
scala> df.select(datetime($"value").as("dt")).show(false)
+------------------------+
|dt |
+------------------------+
|2019-03-15 22:20:06.456 |
+------------------------+
scala> df.select(datetime($"value").as("dt").cast("timestamp")).show(false)
+-----------------------+
|dt |
+-----------------------+
|2019-03-15 22:20:06.456|
+-----------------------+
scala> df.select(datetime($"value").as("dt").cast("timestamp")).printSchema
root
|-- dt: timestamp (nullable = true)
【讨论】:
非常感谢。如何使它通用?就像在我的情况下,不同的表有不同的列集,我像 val modifiedDf = filter._2.fields.foldLeft(sourceDF) case (newDf, SchemaField(name, fieldType)) => newDf.withColumn(name, newDf.col(name) .cast(DataTypeUtil.mapDataType(fieldType))) datetime 是 udf,在某个顶层声明。像这样在需要时使用它 - newDf.withColumn("ts",datetime($"ts")).withColumn(name, newDf.col(name) .cast(DataTypeUtil.mapDataType(fieldType)) 并非如此。当它尝试使用 udf 的通用方法时,值不会被更改。请看帖子***.com/questions/61708010/… 嗨 Srinivas 时区存在问题,因为您看到返回值为 2019-03-16T03:50:06.456+05:30,而原始时间戳为 2019-03-15 18:20: 06.456。我猜这个问题是由于时区造成的。有没有办法强制使用相同的时区,以便存储的 Long 值应该被读取到完全相同的时间戳 更新了转换UTC时区的答案,如果您愿意,可以将其更改为不同的时区,如果有助于解决问题,请接受或投票。以上是关于spark scala比较具有时间戳列的数据帧的主要内容,如果未能解决你的问题,请参考以下文章
通过读取具有不同数据类型的 Scala 序列来创建 Spark 数据帧
Apache Spark:迭代数据帧行并通过 MutableList (Scala) 创建新数据帧