在 Spark 中基于窗口和条件创建新列
Posted
技术标签:
【中文标题】在 Spark 中基于窗口和条件创建新列【英文标题】:Creating a new column based on a window and a condition in Spark 【发布时间】:2020-06-07 21:44:59 【问题描述】:初始数据帧:
+------------------------------+----------+-------+
| Timestamp | Property | Value |
+------------------------------+----------+-------+
| 2019-09-01T01:36:57.000+0000 | X | N |
| 2019-09-01T01:37:39.000+0000 | A | 3 |
| 2019-09-01T01:42:55.000+0000 | X | Y |
| 2019-09-01T01:53:44.000+0000 | A | 17 |
| 2019-09-01T01:55:34.000+0000 | A | 9 |
| 2019-09-01T01:57:32.000+0000 | X | N |
| 2019-09-01T02:59:40.000+0000 | A | 2 |
| 2019-09-01T02:00:03.000+0000 | A | 16 |
| 2019-09-01T02:01:40.000+0000 | X | Y |
| 2019-09-01T02:04:03.000+0000 | A | 21 |
+------------------------------+----------+-------+
最终数据帧:
+------------------------------+----------+-------+---+
| Timestamp | Property | Value | X |
+------------------------------+----------+-------+---+
| 2019-09-01T01:37:39.000+0000 | A | 3 | N |
| 2019-09-01T01:53:44.000+0000 | A | 17 | Y |
| 2019-09-01T01:55:34.000+0000 | A | 9 | Y |
| 2019-09-01T02:00:03.000+0000 | A | 16 | N |
| 2019-09-01T02:04:03.000+0000 | A | 21 | Y |
| 2019-09-01T02:59:40.000+0000 | A | 2 | Y |
+------------------------------+----------+-------+---+
基本上,我有一个时间戳、一个属性和一个值字段。该属性可以是A
或X
,并且它有一个值。我想根据X
属性的值创建一个新的DataFrame,其中第四列名为X
。
-
我开始浏览从最早到最旧的行。
我遇到了具有 X 属性的行,我存储了它的值并将其插入到 X 列中。
如果我遇到 A 属性行:我将上一步中存储的值插入 X 列。
ELSE(意味着我遇到了 X 属性行):我更新了存储的值(因为它是更新的)并将新的存储值插入到 X 列中。
我一直这样做,直到完成整个数据框。
我删除了带有 X 属性的行以使最终数据框显示在上面。
我确信有某种方法可以通过 Window 函数有效地做到这一点。
【问题讨论】:
给定 TS 2:59:40,2 的 X 值在预期输出中应该是 Y。对吗? 是的,感谢您的编辑。 【参考方案1】:用值 X 的值创建一个临时列,如果 A 则为空。然后使用窗口获取最后一个非空临时值。最后过滤属性“A”。
scala> val df = Seq(
| ("2019-09-01T01:36:57.000+0000", "X", "N"),
| ("2019-09-01T01:37:39.000+0000", "A", "3"),
| ("2019-09-01T01:42:55.000+0000", "X", "Y"),
| ("2019-09-01T01:53:44.000+0000", "A", "17"),
| ("2019-09-01T01:55:34.000+0000", "A", "9"),
| ("2019-09-01T01:57:32.000+0000", "X", "N"),
| ("2019-09-01T02:59:40.000+0000", "A", "2"),
| ("2019-09-01T02:00:03.000+0000", "A", "16"),
| ("2019-09-01T02:01:40.000+0000", "X", "Y"),
| ("2019-09-01T02:04:03.000+0000", "A", "21")
| ).toDF("Timestamp", "Property", "Value").withColumn("Temp", when($"Property" === "X", $"Value").otherwise(null))
df: org.apache.spark.sql.DataFrame = [Timestamp: string, Property: string ... 2 more fields]
scala> df.show(false)
+----------------------------+--------+-----+----+
|Timestamp |Property|Value|Temp|
+----------------------------+--------+-----+----+
|2019-09-01T01:36:57.000+0000|X |N |N |
|2019-09-01T01:37:39.000+0000|A |3 |null|
|2019-09-01T01:42:55.000+0000|X |Y |Y |
|2019-09-01T01:53:44.000+0000|A |17 |null|
|2019-09-01T01:55:34.000+0000|A |9 |null|
|2019-09-01T01:57:32.000+0000|X |N |N |
|2019-09-01T02:59:40.000+0000|A |2 |null|
|2019-09-01T02:00:03.000+0000|A |16 |null|
|2019-09-01T02:01:40.000+0000|X |Y |Y |
|2019-09-01T02:04:03.000+0000|A |21 |null|
+----------------------------+--------+-----+----+
scala> val overColumns = Window.orderBy("TimeStamp").rowsBetween(Window.unboundedPreceding, Window.currentRow)
overColumns: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@1b759662
scala> df.withColumn("X", last($"Temp",true).over(overColumns)).show(false)
+----------------------------+--------+-----+----+---+
|Timestamp |Property|Value|Temp|X |
+----------------------------+--------+-----+----+---+
|2019-09-01T01:36:57.000+0000|X |N |N |N |
|2019-09-01T01:37:39.000+0000|A |3 |null|N |
|2019-09-01T01:42:55.000+0000|X |Y |Y |Y |
|2019-09-01T01:53:44.000+0000|A |17 |null|Y |
|2019-09-01T01:55:34.000+0000|A |9 |null|Y |
|2019-09-01T01:57:32.000+0000|X |N |N |N |
|2019-09-01T02:00:03.000+0000|A |16 |null|N |
|2019-09-01T02:01:40.000+0000|X |Y |Y |Y |
|2019-09-01T02:04:03.000+0000|A |21 |null|Y |
|2019-09-01T02:59:40.000+0000|A |2 |null|Y |
+----------------------------+--------+-----+----+---+
scala> df.withColumn("X", last($"Temp",true).over(overColumns)).filter($"Property" === "A").show(false)
+----------------------------+--------+-----+----+---+
|Timestamp |Property|Value|Temp|X |
+----------------------------+--------+-----+----+---+
|2019-09-01T01:37:39.000+0000|A |3 |null|N |
|2019-09-01T01:53:44.000+0000|A |17 |null|Y |
|2019-09-01T01:55:34.000+0000|A |9 |null|Y |
|2019-09-01T02:00:03.000+0000|A |16 |null|N |
|2019-09-01T02:04:03.000+0000|A |21 |null|Y |
|2019-09-01T02:59:40.000+0000|A |2 |null|Y |
+----------------------------+--------+-----+----+---+
【讨论】:
谢谢,我就是这么问的。如果在初始数据帧中我还有一个名为“device_ID”的分类字段,我想要一个最终数据帧,就像您设法生成的数据帧一样,但是关于每个设备。换句话说,对device_ID
的每个值分别执行相同的操作。这意味着时间戳顺序仅对具有相同device_ID
的行很重要。当然,特定的device_id
同时具有 A 和 X 属性。
Window.partitionBy("device_id").orderBy("TimeStamp").rowsBet....
你像这样按“device_id”分区
哇,这么简单/直接!我不知道这个partitionBy
函数。您是否认为应该使用其他子问题编辑问题并要求也编辑您的答案?我认为它可能对其他用户有用..以上是关于在 Spark 中基于窗口和条件创建新列的主要内容,如果未能解决你的问题,请参考以下文章
使用 pyspark 基于 if 和 else 条件创建新列
如何使用 Scala/Spark 添加不基于数据框中现有列的新列? [复制]