pyspark 使用过滤器应用 DataFrame 窗口函数

Posted

技术标签:

【中文标题】pyspark 使用过滤器应用 DataFrame 窗口函数【英文标题】:pyspark Apply DataFrame window function with filter 【发布时间】:2017-07-28 14:25:28 【问题描述】:

我有一个包含列的数据集:id,timestamp,x,y

id  timestamp   x      y 
0   1443489380  100    1
0   1443489390  200    0
0   1443489400  300    0
0   1443489410  400    1

我定义了一个窗口规范:w = Window.partitionBy("id").orderBy("timestamp")

我想做这样的事情。创建一个新列,将当前行的 x 与下一行的 x 相加。

如果总和 >= 500 则设置新列 = BIG 否则 SMALL。

df = df.withColumn("newCol", 
                   when(df.x + lag(df.x,-1).over(w) >= 500 , "BIG")
                   .otherwise("SMALL") )

但是,我想在此之前过滤数据而不影响原始df

[只有 y =1 的行才会应用上面的代码]

所以将应用上述代码的数据只有这 2 行。

0 , 1443489380, 100 , 1

0 , 1443489410, 400 , 1

我已经这样做了,但是太糟糕了。

df2 = df.filter(df.y == 1)
df2 = df2.withColumn("newCol", 
                     when(df.x + lag(df.x,-1).over(w) >= 500 , "BIG")
                     .otherwise("SMALL") )
df = df.join(df2, ["id","timestamp"], "outer")

我想做这样的事情,但这是不可能的,因为它会导致 AttributeError: 'DataFrame' object has no attribute 'when'

df = df.withColumn("newCol", df.filter(df.y == 1)
                   .when(df.x + lag(df.x,-1).over(w) >= 500 , "BIG")
                   .otherwise("SMALL") )

总之,我只想在 sum x 和下一个 x 之前只对 y =1 的行做一个临时过滤。

【问题讨论】:

你已经从 pyspark.sql.functions 导入了,对吧? 我已经通过 from pyspark.sql.functions import lag 导入,当 【参考方案1】:

您的代码运行良好,我认为您可以使用导入函数模块。试过你的代码,

>>> from pyspark.sql import functions as F
>>> df2 = df2.withColumn("newCol", 
                 F.when((df.x + F.lag(df.x,-1).over(w))>= 500 , "BIG")
                 .otherwise("SMALL") )
>>> df2.show()
+---+----------+---+---+------+
| id| timestamp|  x|  y|newCol|
+---+----------+---+---+------+
|  0|1443489380|100|  1|   BIG|
|  0|1443489410|400|  1| SMALL|
+---+----------+---+---+------+

编辑: 已尝试根据 'id','y' 列更改窗口分区,

>>> w = Window.partitionBy("id","y").orderBy("timestamp")
>>> df.select("*", F.when(df.y == 1,F.when((df.x+F.lag("x",-1).over(w)) >=500,'BIG').otherwise('SMALL')).otherwise(None).alias('new_col')).show()
+---+----------+---+---+-------+
| id| timestamp|  x|  y|new_col|
+---+----------+---+---+-------+
|  0|1443489380|100|  1|    BIG|
|  0|1443489410|400|  1|  SMALL|
|  0|1443489390|200|  0|   null|
|  0|1443489400|300|  0|   null|
+---+----------+---+---+-------+

【讨论】:

是的,但我的目的是使用 df 而不是 df2。如果我使用 df2,那么我必须稍后加入它,它会花费巨大的性能。 也就是说,我想在计算过程中过滤掉df(df.y==1)。但是,df 必须保持不变。过滤后的行只是计算的临时行。 那么,当 y ==0, null 时 newcol 的值应该是多少? 是的,null 很好。 已编辑我的答案,请检查并告诉我是否遗漏任何内容

以上是关于pyspark 使用过滤器应用 DataFrame 窗口函数的主要内容,如果未能解决你的问题,请参考以下文章

当列文本包含超过 10 个单词时过滤 pyspark DataFrame

过滤器生成的 PySpark DataFrame - 它存储在哪里?

PySpark:在日期为字符串的范围内按日期字段过滤DataFrame

Pyspark Dataframe 组通过过滤

如何从 Pyspark Dataframe 中的字符串列中过滤字母值?

在 Pyspark 中使用整数与十进制值进行过滤