如何在 row_number() 列上应用转换
Posted
技术标签:
【中文标题】如何在 row_number() 列上应用转换【英文标题】:How to apply tranformation on row_number() column 【发布时间】:2020-09-21 16:39:32 【问题描述】:需要应用转换,以便将分区组的 created_at 字段的第一个值添加到整个分区组的新列 startDate 中。
其次,每当 tg 更改“type”和“key”列的相同值时,新列的 created_at 字段应该成为其上方具有相同“type”和“key”的行的 endDate,否则它保持为空。
type key tg created_at timestamp row_number
device_id essentials template 1600269347 2020-09-21 19:08:05 1
device_id experiment t1 1599721314 2020-09-17 01:37:17 1
device_id experiment v1 1600228007 2020-09-21 18:07:53 2
device_id experiment c1 1605221085 2020-09-21 18:07:53 3
test t_key t1 1599714939 2020-09-16 01:37:55 1
test t_key t2 1600084857 2020-09-21 17:08:23 2
到目前为止的应用步骤-: val windowSpec = Window.partitionBy("type","key").orderBy("timestamp") test.withColumn("row_number",row_number.over(windowSpec)).show()
预期输出-:
type key tg created_at timestamp row_number startDate endDate
device_id essentials template 1600269347 2020-09-21 19:08:05 1 1600269347 null
device_id experiment t1 1599721314 2020-09-17 01:37:17 1 1599721314 1600228007
device_id experiment v1 1600228007 2020-09-21 18:07:53 2 1599721314 1605221085
device_id experiment c1 1605221085 2020-09-21 18:07:53 3 1599721314 null
test t_key t1 1599714939 2020-09-16 01:37:55 1 1599714939 1600084857
test t_key t2 1600084857 2020-09-21 17:08:23 2 1599714939 null
关于如何进行的任何建议?
【问题讨论】:
【参考方案1】:您可以在窗口上使用first
来获取 created_at 的第一个值。 min
在这种情况下也可以使用。
第二个有点棘手。您需要使用lag
并记住窗口中最后一行的滞后结果将始终为空。
val schema = List(
StructField("type", StringType, true),
StructField("key", StringType, true),
StructField("tg", StringType, true),
StructField("created_at", IntegerType, true),
StructField("timestamp", TimestampType, true),
StructField("row_number", IntegerType, true)
)
val data = Seq(
Row("device_id", "essentials", "template", 1600269347, Timestamp.valueOf("2020-09-21 19:08:05"), 1),
Row("device_id", "experiment", "t1", 1599721314, Timestamp.valueOf("2020-09-17 01:37:17"), 1),
Row("device_id", "experiment", "v1", 1600228007, Timestamp.valueOf("2020-09-21 18:07:53"), 2),
Row("device_id", "experiment", "c1", 1605221085, Timestamp.valueOf("2020-09-21 18:07:53"), 3),
Row("test", "t_key", "t1", 1599714939, Timestamp.valueOf("2020-09-16 01:37:55"), 1),
Row("test", "t_key", "t2", 1600084857, Timestamp.valueOf("2020-09-21 17:08:23"), 2)
)
val test = spark.createDataFrame(spark.sparkContext.parallelize(data), StructType(schema))
val windowSpec = Window.partitionBy("type","key").orderBy("timestamp")
test
.withColumn("startDate", first(col("created_at")).over(windowSpec))
.withColumn("endDate", when(
lead(col("tg"), 1).over(windowSpec).isNotNull &&
lead(col("tg"), 1).over(windowSpec) =!= col("tg"),
lead(col("created_at"), 1).over(windowSpec)
).otherwise(lit(null).cast(IntegerType)))
.show()
+---------+----------+--------+----------+-------------------+----------+----------+----------+
| type| key| tg|created_at| timestamp|row_number| startDate| endDate|
+---------+----------+--------+----------+-------------------+----------+----------+----------+
|device_id|essentials|template|1600269347|2020-09-21 19:08:05| 1|1600269347| null|
|device_id|experiment| t1|1599721314|2020-09-17 01:37:17| 1|1599721314|1600228007|
|device_id|experiment| v1|1600228007|2020-09-21 18:07:53| 2|1599721314|1605221085|
|device_id|experiment| c1|1605221085|2020-09-21 18:07:53| 3|1599721314| null|
| test| t_key| t1|1599714939|2020-09-16 01:37:55| 1|1599714939|1600084857|
| test| t_key| t2|1600084857|2020-09-21 17:08:23| 2|1599714939| null|
+---------+----------+--------+----------+-------------------+----------+----------+----------+
【讨论】:
感谢@liamvt 的回答。 嗨@PranavChawla。如果这解决了您的问题,请将答案标记为已接受。以上是关于如何在 row_number() 列上应用转换的主要内容,如果未能解决你的问题,请参考以下文章
SQL Server:row_number 分区不重置计数器
使用带关系的 ROW_NUMBER() OVER 十进制列时未确定的排序顺序
如何在 Hibernate 中使用 row_number 函数编写查询?
如何使用 Spark 数据框列上的函数或方法使用 Scala 进行转换