在 pyspark 数据框中循环遍历两列时将值添加到新列

Posted

技术标签:

【中文标题】在 pyspark 数据框中循环遍历两列时将值添加到新列【英文标题】:Adding values to a new column while looping through two columns in a pyspark dataframe 【发布时间】:2019-11-12 06:34:52 【问题描述】:

我有一个带有列的 pyspark 数据框(除了一些列之外): 每个月都有多个 id。每个 id 的活动状态由金额列确定。如果金额 > 0,则 active = 1,否则为 0。

+-----------------------------+---
|id|amount|  dates   | active |
+-----------------------------+---
| X|     0|2019-05-01|    0   |
| X|   120|2019-06-01|    1   |      
| Y|    60|2019-06-01|    1   |
| X|     0|2019-07-01|    0   |
| Y|     0|2019-07-01|    0   |
| Z|    50|2019-06-01|    1   |
| Y|     0|2019-07-01|    0   |
+-----------------------------+---

我要计算和添加的新列是 p3mactive。 它是根据过去三个月的活跃状态计算的。 例如:对于 id = x,日期 = 2019-08-01,p3mactive = 1,因为 X 在 2019-06-01 中处于活动状态。 如果在此之前的几个月不存在,则 p3m active = 0。如果只有 1 或 2 个月,则 p3m active 可以简单地计算为 max(active(month-1), active(month-2))。基本上是在现有列的基础上。

+-----------------------------+-----------+
|id|amount|  dates   | active | p3mactive |
+-----------------------------+-----------+
| X|     0|2019-05-01|    0   |     0     |
| X|   120|2019-06-01|    1   |     0     |      
| Y|    60|2019-06-01|    1   |     0     |
| X|     0|2019-07-01|    0   |     1     |
| Y|     0|2019-07-01|    0   |     1     |
| Z|    50|2019-06-01|    1   |     0     |
| Y|     0|2019-07-01|    0   |     1     |
+-----------------------------+-----------+

所以基本上:

    05 的 X 具有活动 0,并且在此之前没有几个月,因此 p3mactive 为 0。 Y 在 06 中激活,因此 p3mactive 在 07 中为 1,而 p3mactive 在 06 中仍为 0。 Z只有06的数据,所以06的p3mactive为0

等等。如果对流程有任何疑问,请告诉我。

我想在 pyspark 中使用更可取的数据帧操作和函数来实现这一点。 一般来说,我可以很容易地想到如何使用 pandas 或 python 来做到这一点,但是我是 spark 新手,想不出一种方法来循环遍历每个给定月份的 id,然后将前三个月的活动状态选择为最大值(m1,m2,m3) 函数,如果前几个月不存在,则保持边缘条件。任何帮助将不胜感激。

【问题讨论】:

【参考方案1】:

您可以通过 Window 函数使用 whenlag 来执行此操作:

from pyspark.sql.window import Window
from pyspark.sql.functions import when, col, lag

w = Window().partitionBy("id").orderBy("dates")
df = df.withColumn("p3mactive", when(
    (lag(df.active,1).over(w) == 1)| 
    (lag(df.active,2).over(w) == 1) | 
    (lag(df.active,3).over(w) == 1), 1).otherwise(0))

您不能循环遍历 pyspark 数据帧,但您可以使用 Window 跨越它们。您可以使用when 应用条件,您可以使用lag 查看以前的行,使用lead 查看未来的行。如果x 之前的行不存在,则条件评估为假,您将获得0 作为您的用例提及。

我希望这会有所帮助。

【讨论】:

嗨。谢谢您的回答。这很有帮助。我将检查我的数据并将其标记为已接受。再次感谢。 @Sushant 不客气。我希望你达到预期的输出 嗨。是的。它按预期工作!只是一次更正。我想你的意思是把它写成 df = df.withColumn("p3mactive", when( (lag(col("active"), 1).over(w) == 1) | (lag(col("active" ), 2).over(w) == 1) | (lag(col("active"), 3).over(w) == 1), 1).otherwise(0))。滞后函数超过窗口,然后是比较。 :) 我只会编辑答案。非常感谢!

以上是关于在 pyspark 数据框中循环遍历两列时将值添加到新列的主要内容,如果未能解决你的问题,请参考以下文章

Python:如何在比较其他列时将列值填充到另一个数据框中的新列?

PYSPARK:如何在 pyspark 数据框中找到两列的余弦相似度?

pyspark数据框中两列之间的时间差

在 pyspark 中比较不同数据框中的两列,分别为 String 和 Array<string> 类型

Javascript在选择时将值生成到表单文本框中

遍历 pyspark 数据框中的列,而不为单个列创建不同的数据框