如何在 row_number() 列上应用转换

Posted

技术标签:

【中文标题】如何在 row_number() 列上应用转换【英文标题】:How to apply tranformation on row_number() column 【发布时间】:2020-09-21 16:39:32 【问题描述】:

需要应用转换,以便将分区组的 created_at 字段的第一个值添加到整个分区组的新列 startDate 中。

其次,每当 tg 更改“type”和“key”列的相同值时,新列的 created_at 字段应该成为其上方具有相同“type”和“key”的行的 endDate,否则它保持为空。

type             key         tg      created_at       timestamp       row_number

device_id    essentials    template   1600269347   2020-09-21 19:08:05      1                           
device_id    experiment      t1       1599721314   2020-09-17 01:37:17      1                                                    
device_id    experiment      v1       1600228007   2020-09-21 18:07:53      2
device_id    experiment      c1       1605221085   2020-09-21 18:07:53      3
test         t_key           t1       1599714939   2020-09-16 01:37:55      1
test         t_key           t2       1600084857   2020-09-21 17:08:23      2

到目前为止的应用步骤-: val windowSpec = Window.partitionBy("type","key").orderBy("timestamp") test.withColumn("row_number",row_number.over(windowSpec)).show()

预期输出-:

type        key         tg      created_at       timestamp     row_number startDate  endDate

device_id  essentials template 1600269347   2020-09-21 19:08:05  1        1600269347  null                
device_id  experiment   t1     1599721314   2020-09-17 01:37:17  1        1599721314  1600228007                                      
device_id  experiment   v1     1600228007   2020-09-21 18:07:53  2        1599721314  1605221085
device_id  experiment   c1     1605221085   2020-09-21 18:07:53  3        1599721314  null
test       t_key        t1     1599714939   2020-09-16 01:37:55  1        1599714939  1600084857
test       t_key        t2     1600084857   2020-09-21 17:08:23  2        1599714939  null

关于如何进行的任何建议?

【问题讨论】:

【参考方案1】:

您可以在窗口上使用first 来获取 created_at 的第一个值。 min 在这种情况下也可以使用。

第二个有点棘手。您需要使用lag 并记住窗口中最后一行的滞后结果将始终为空。

val schema =  List(
  StructField("type", StringType, true),
  StructField("key", StringType, true),
  StructField("tg", StringType, true),
  StructField("created_at", IntegerType, true),
  StructField("timestamp", TimestampType, true),
  StructField("row_number", IntegerType, true)
)

val data =  Seq(
    Row("device_id", "essentials", "template", 1600269347, Timestamp.valueOf("2020-09-21 19:08:05"), 1),
    Row("device_id", "experiment", "t1", 1599721314, Timestamp.valueOf("2020-09-17 01:37:17"), 1),  
    Row("device_id", "experiment", "v1", 1600228007, Timestamp.valueOf("2020-09-21 18:07:53"), 2),
    Row("device_id", "experiment", "c1", 1605221085, Timestamp.valueOf("2020-09-21 18:07:53"), 3),
    Row("test", "t_key", "t1", 1599714939, Timestamp.valueOf("2020-09-16 01:37:55"), 1),
    Row("test", "t_key", "t2", 1600084857, Timestamp.valueOf("2020-09-21 17:08:23"), 2)
  )

val test = spark.createDataFrame(spark.sparkContext.parallelize(data), StructType(schema))

val windowSpec = Window.partitionBy("type","key").orderBy("timestamp")
test
.withColumn("startDate", first(col("created_at")).over(windowSpec))
.withColumn("endDate", when(
  lead(col("tg"), 1).over(windowSpec).isNotNull && 
  lead(col("tg"), 1).over(windowSpec) =!= col("tg"), 
  lead(col("created_at"), 1).over(windowSpec)
).otherwise(lit(null).cast(IntegerType)))
.show()
+---------+----------+--------+----------+-------------------+----------+----------+----------+
|     type|       key|      tg|created_at|          timestamp|row_number| startDate|   endDate|
+---------+----------+--------+----------+-------------------+----------+----------+----------+
|device_id|essentials|template|1600269347|2020-09-21 19:08:05|         1|1600269347|      null|
|device_id|experiment|      t1|1599721314|2020-09-17 01:37:17|         1|1599721314|1600228007|
|device_id|experiment|      v1|1600228007|2020-09-21 18:07:53|         2|1599721314|1605221085|
|device_id|experiment|      c1|1605221085|2020-09-21 18:07:53|         3|1599721314|      null|
|     test|     t_key|      t1|1599714939|2020-09-16 01:37:55|         1|1599714939|1600084857|
|     test|     t_key|      t2|1600084857|2020-09-21 17:08:23|         2|1599714939|      null|
+---------+----------+--------+----------+-------------------+----------+----------+----------+

【讨论】:

感谢@liamvt 的回答。 嗨@PranavChawla。如果这解决了您的问题,请将答案标记为已接受。

以上是关于如何在 row_number() 列上应用转换的主要内容,如果未能解决你的问题,请参考以下文章

SQL Server:row_number 分区不重置计数器

使用带关系的 ROW_NUMBER() OVER 十进制列时未确定的排序顺序

如何在 Hibernate 中使用 row_number 函数编写查询?

如何使用 Spark 数据框列上的函数或方法使用 Scala 进行转换

如何在 Room @Query 中使用 ROW_NUMBER()?

如何在列上应用函数[重复]