在 pyspark 数据框中循环遍历两列时将值添加到新列
Posted
技术标签:
【中文标题】在 pyspark 数据框中循环遍历两列时将值添加到新列【英文标题】:Adding values to a new column while looping through two columns in a pyspark dataframe 【发布时间】:2019-11-12 06:34:52 【问题描述】:我有一个带有列的 pyspark 数据框(除了一些列之外): 每个月都有多个 id。每个 id 的活动状态由金额列确定。如果金额 > 0,则 active = 1,否则为 0。
+-----------------------------+---
|id|amount| dates | active |
+-----------------------------+---
| X| 0|2019-05-01| 0 |
| X| 120|2019-06-01| 1 |
| Y| 60|2019-06-01| 1 |
| X| 0|2019-07-01| 0 |
| Y| 0|2019-07-01| 0 |
| Z| 50|2019-06-01| 1 |
| Y| 0|2019-07-01| 0 |
+-----------------------------+---
我要计算和添加的新列是 p3mactive。 它是根据过去三个月的活跃状态计算的。 例如:对于 id = x,日期 = 2019-08-01,p3mactive = 1,因为 X 在 2019-06-01 中处于活动状态。 如果在此之前的几个月不存在,则 p3m active = 0。如果只有 1 或 2 个月,则 p3m active 可以简单地计算为 max(active(month-1), active(month-2))。基本上是在现有列的基础上。
+-----------------------------+-----------+
|id|amount| dates | active | p3mactive |
+-----------------------------+-----------+
| X| 0|2019-05-01| 0 | 0 |
| X| 120|2019-06-01| 1 | 0 |
| Y| 60|2019-06-01| 1 | 0 |
| X| 0|2019-07-01| 0 | 1 |
| Y| 0|2019-07-01| 0 | 1 |
| Z| 50|2019-06-01| 1 | 0 |
| Y| 0|2019-07-01| 0 | 1 |
+-----------------------------+-----------+
所以基本上:
-
05 的 X 具有活动 0,并且在此之前没有几个月,因此 p3mactive 为 0。
Y 在 06 中激活,因此 p3mactive 在 07 中为 1,而 p3mactive 在 06 中仍为 0。
Z只有06的数据,所以06的p3mactive为0
等等。如果对流程有任何疑问,请告诉我。
我想在 pyspark 中使用更可取的数据帧操作和函数来实现这一点。 一般来说,我可以很容易地想到如何使用 pandas 或 python 来做到这一点,但是我是 spark 新手,想不出一种方法来循环遍历每个给定月份的 id,然后将前三个月的活动状态选择为最大值(m1,m2,m3) 函数,如果前几个月不存在,则保持边缘条件。任何帮助将不胜感激。
【问题讨论】:
【参考方案1】:您可以通过 Window
函数使用 when
和 lag
来执行此操作:
from pyspark.sql.window import Window
from pyspark.sql.functions import when, col, lag
w = Window().partitionBy("id").orderBy("dates")
df = df.withColumn("p3mactive", when(
(lag(df.active,1).over(w) == 1)|
(lag(df.active,2).over(w) == 1) |
(lag(df.active,3).over(w) == 1), 1).otherwise(0))
您不能循环遍历 pyspark 数据帧,但您可以使用 Window
跨越它们。您可以使用when
应用条件,您可以使用lag
查看以前的行,使用lead
查看未来的行。如果x
之前的行不存在,则条件评估为假,您将获得0
作为您的用例提及。
我希望这会有所帮助。
【讨论】:
嗨。谢谢您的回答。这很有帮助。我将检查我的数据并将其标记为已接受。再次感谢。 @Sushant 不客气。我希望你达到预期的输出 嗨。是的。它按预期工作!只是一次更正。我想你的意思是把它写成 df = df.withColumn("p3mactive", when( (lag(col("active"), 1).over(w) == 1) | (lag(col("active" ), 2).over(w) == 1) | (lag(col("active"), 3).over(w) == 1), 1).otherwise(0))。滞后函数超过窗口,然后是比较。 :) 我只会编辑答案。非常感谢!以上是关于在 pyspark 数据框中循环遍历两列时将值添加到新列的主要内容,如果未能解决你的问题,请参考以下文章
Python:如何在比较其他列时将列值填充到另一个数据框中的新列?
PYSPARK:如何在 pyspark 数据框中找到两列的余弦相似度?