窗口函数将一列中的 n 行转换为单行
Posted
技术标签:
【中文标题】窗口函数将一列中的 n 行转换为单行【英文标题】:Window function to convert n number of rows in a column to one single row 【发布时间】:2019-11-25 17:56:17 【问题描述】:我需要将一列中的 25 行窗口化为数据框中的一行。
输入数据如下图。
+------+----------+---------------------------------------+
|ID |TIME |SGNL |
+------+----------+---------------------------------------+
|00001 |1574360355|"SN":"Acc","ST":1574360296,"SV":"0.0"|
|00001 |1574360355|"SN":"Acc","ST":1574360296,"SV":"0.0"|
|00001 |1574360355|"SN":"Acc","ST":1574360296,"SV":"0.0"|
|00001 |1574360355|"SN":"Acc","ST":1574360297,"SV":"0.0"|
|00002 |1574360355|"SN":"Acc","ST":1574360297,"SV":"0.0"|
|00002 |1574360355|"SN":"Acc","ST":1574360297,"SV":"0.0"|
|00002 |1574360355|"SN":"Acc","ST":1574360298,"SV":"0.0"|
+------+----------+---------------------------------------+
我必须在这里应用窗口函数来为特定 ID 获取 25 SGNL 并在单行中及时排序。 我已经完成了使用 ID 对数据框进行分区并在 TIME 中排序的窗口。 现在我必须得到如下数据。
+------+----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------+
|DTC |DTCTS |SGNL |
+------+----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------+
|00001 |1574360355|"SN":"Acc","ST":1574360296,"SV":"0.0","SN":"Acc","ST":1574360296,"SV":"0.0","SN":"Acc","ST":1574360296,"SV":"0.0","SN":"Acc","ST":1574360297,"SV":"0.0"|
|00002 |1574360355|"SN":"Acc","ST":1574360297,"SV":"0.0","SN":"Acc","ST":1574360297,"SV":"0.0","SN":"Acc","ST":1574360298,"SV":"0.0" |
+------+----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------+
如上所示,特定分区的 SGNL 列中的前 25 行应合并为一行。有什么方法可以实现吗?
【问题讨论】:
【参考方案1】:您可以使用 2 Window-Function 来实现您的要求:
val df = Seq(
("00001",1574360355,""""SN":"Acc","ST":1574360296,"SV":"0.0""""),
("00001",1574360355,""""SN":"Acc","ST":1574360296,"SV":"0.0""""),
("00001",1574360355,""""SN":"Acc","ST":1574360296,"SV":"0.0""""),
("00001",1574360355,""""SN":"Acc","ST":1574360297,"SV":"0.0""""),
("00002",1574360355,""""SN":"Acc","ST":1574360297,"SV":"0.0""""),
("00002",1574360355,""""SN":"Acc","ST":1574360297,"SV":"0.0""""),
("00002",1574360355,""""SN":"Acc","ST":1574360298,"SV":"0.0"""")
).toDF("ID", "TIME", "SGNL")
val win =Window.partitionBy($"ID").orderBy($"TIME")
df
.withColumn("rnb",row_number().over(win))
.where($"rnb"<=25) // limit to first 25 rows
.withColumn("SGNL",collect_list($"SGNL").over(win))
.where($"rnb"===1) // collapse to 1 record per ID
.withColumn("SGNL",concat_ws(",",$"SGNL")) // convert array to single string
.drop($"rnb")
.show()
给予:
+-----+----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------+
|ID |TIME |SGNL |
+-----+----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------+
|00001|1574360355|"SN":"Acc","ST":1574360296,"SV":"0.0","SN":"Acc","ST":1574360296,"SV":"0.0","SN":"Acc","ST":1574360296,"SV":"0.0","SN":"Acc","ST":1574360297,"SV":"0.0"|
|00002|1574360355|"SN":"Acc","ST":1574360297,"SV":"0.0","SN":"Acc","ST":1574360297,"SV":"0.0","SN":"Acc","ST":1574360298,"SV":"0.0" |
+-----+----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------+
请注意,Time
现在代表聚合记录中的最小值 Time
。如果您想要最大 Time
,则需要另一个窗口函数来找到最大 rnb
,然后对其进行过滤
【讨论】:
【参考方案2】:更新答案(2):
import spark.implicits._
import org.apache.spark.sql.functions._
val df = (Seq(
("00002",1574360355,""""SN":"Acc","ST":1574360297,"SV":"0.0""""),
("00002",1574360355,""""SN":"Acc","ST":1574360297,"SV":"0.0""""),
("00002",1574360355,""""SN":"Acc","ST":1574360298,"SV":"0.0"""")
) ++ (1 to 51).map_ => ("00001",1574360355,""""SN":"Acc","ST":1574360296,"SV":"0.0""""))
.toDF("ID", "TIME", "SGNL")
.withColumn("rownum", row_number().over(Window.partitionBy($"ID").orderBy($"TIME")))
df.groupBy($"ID", (($"rownum"-1)/25).cast(IntegerType).as("by25"))
.agg(min($"TIME"), collect_list($"SGNL"))
.drop("by25")
.toDF("DTC","DTCTS","SGNL")
.show(false)
+-----+----+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|DTC |by25|DTCTS |SGNL |
+-----+----+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|00001|0 |1574360355|["SN":"Acc","ST":1574360296,"SV":"0.0", "SN":"Acc","ST":1574360296,"SV":"0.0", "SN":"Acc","ST":1574360296,"SV":"0.0", "SN":"Acc","ST":1574360296,"SV":"0.0", "SN":"Acc","ST":1574360296,"SV":"0.0", "SN":"Acc","ST":1574360296,"SV":"0.0", "SN":"Acc","ST":1574360296,"SV":"0.0", "SN":"Acc","ST":1574360296,"SV":"0.0", "SN":"Acc","ST":1574360296,"SV":"0.0", "SN":"Acc","ST":1574360296,"SV":"0.0", "SN":"Acc","ST":1574360296,"SV":"0.0", "SN":"Acc","ST":1574360296,"SV":"0.0", "SN":"Acc","ST":1574360296,"SV":"0.0", "SN":"Acc","ST":1574360296,"SV":"0.0", "SN":"Acc","ST":1574360296,"SV":"0.0", "SN":"Acc","ST":1574360296,"SV":"0.0", "SN":"Acc","ST":1574360296,"SV":"0.0", "SN":"Acc","ST":1574360296,"SV":"0.0", "SN":"Acc","ST":1574360296,"SV":"0.0", "SN":"Acc","ST":1574360296,"SV":"0.0", "SN":"Acc","ST":1574360296,"SV":"0.0", "SN":"Acc","ST":1574360296,"SV":"0.0", "SN":"Acc","ST":1574360296,"SV":"0.0", "SN":"Acc","ST":1574360296,"SV":"0.0", "SN":"Acc","ST":1574360296,"SV":"0.0"]|
|00001|1 |1574360355|["SN":"Acc","ST":1574360296,"SV":"0.0", "SN":"Acc","ST":1574360296,"SV":"0.0", "SN":"Acc","ST":1574360296,"SV":"0.0", "SN":"Acc","ST":1574360296,"SV":"0.0", "SN":"Acc","ST":1574360296,"SV":"0.0", "SN":"Acc","ST":1574360296,"SV":"0.0", "SN":"Acc","ST":1574360296,"SV":"0.0", "SN":"Acc","ST":1574360296,"SV":"0.0", "SN":"Acc","ST":1574360296,"SV":"0.0", "SN":"Acc","ST":1574360296,"SV":"0.0", "SN":"Acc","ST":1574360296,"SV":"0.0", "SN":"Acc","ST":1574360296,"SV":"0.0", "SN":"Acc","ST":1574360296,"SV":"0.0", "SN":"Acc","ST":1574360296,"SV":"0.0", "SN":"Acc","ST":1574360296,"SV":"0.0", "SN":"Acc","ST":1574360296,"SV":"0.0", "SN":"Acc","ST":1574360296,"SV":"0.0", "SN":"Acc","ST":1574360296,"SV":"0.0", "SN":"Acc","ST":1574360296,"SV":"0.0", "SN":"Acc","ST":1574360296,"SV":"0.0", "SN":"Acc","ST":1574360296,"SV":"0.0", "SN":"Acc","ST":1574360296,"SV":"0.0", "SN":"Acc","ST":1574360296,"SV":"0.0", "SN":"Acc","ST":1574360296,"SV":"0.0", "SN":"Acc","ST":1574360296,"SV":"0.0"]|
|00001|2 |1574360355|["SN":"Acc","ST":1574360296,"SV":"0.0"] |
|00002|0 |1574360355|["SN":"Acc","ST":1574360297,"SV":"0.0", "SN":"Acc","ST":1574360297,"SV":"0.0", "SN":"Acc","ST":1574360298,"SV":"0.0"] |
+-----+----+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
请注意,结果现在是一个数组。
【讨论】:
我想要所有的列 -1st 25 在一行中,下一个 25 在另一行中,依此类推.. 此处使用的 where 子句将只允许访问前 25 行。以上是关于窗口函数将一列中的 n 行转换为单行的主要内容,如果未能解决你的问题,请参考以下文章
Python Pandas 将一列中的 NaN 替换为第二列对应行的值