PySpark - 如何根据列中的两个值从数据框中过滤出连续的行块

Posted

技术标签:

【中文标题】PySpark - 如何根据列中的两个值从数据框中过滤出连续的行块【英文标题】:PySpark - How to filter a consecutive chunk of rows out of a dataframe based on two values in a column 【发布时间】:2019-02-01 12:51:59 【问题描述】:

我有一个数据框,我想使用pyspark 根据某些列值创建另一个数据框。 例如:下面是我的主要数据框-

Part1   Part2   Part3   Part4
aaa      up      24     k-123
bbb     down     45     i-98
ccc     down     54     k-89
fff     int      23     l-34
xyz      up      22     o-89
www      up      89     u-56

现在,我想创建另一个数据框,它将搜索第一次出现的“向下”,直到第一次出现“向上”。因此,预期的数据框将是:

   Part1    Part2   Part3   Part4
    bbb     down     45     i-98
    ccc     down     54     k-89
    fff     int      23     l-34
    xyz      up      22     o-89

【问题讨论】:

【参考方案1】:

第 1 步:创建DataFrame

from pyspark.sql.functions import when, col, lit
df = spark.createDataFrame(
    [('aaa','up',24,'k-123'),('bbb','down',45,'i-98'),('ccc','down',54,'k-89'),
     ('fff','int', 23,'l-34'),('xyz','up',22,'o-89'),('www','up',89,'u-56')], 
    schema = ['Part1','Part2','Part3','Part4']
)
df.show()
+-----+-----+-----+-----+
|Part1|Part2|Part3|Part4|
+-----+-----+-----+-----+
|  aaa|   up|   24|k-123|
|  bbb| down|   45| i-98|
|  ccc| down|   54| k-89|
|  fff|  int|   23| l-34|
|  xyz|   up|   22| o-89|
|  www|   up|   89| u-56|
+-----+-----+-----+-----+

第 2 步:首先我们需要找到down 的第一个匹配项并删除它上面的所有行。为此,我们创建一个列 cumulative,如果 Part2 == down 则值为 1,否则为 0,最后取此列的累积总和。

df = df.withColumn('Dummy',lit('dummy'))
df = df.withColumn('cumulative',when(col('Part2')=='down',1).otherwise(0))
df = df.selectExpr(
    'Part1','Part2','Part3','Part4','Dummy',
    'sum(cumulative) over (order by row_number() over (order by Dummy)) as cumulative'
 )
df.show()
+-----+-----+-----+-----+-----+----------+
|Part1|Part2|Part3|Part4|Dummy|cumulative|
+-----+-----+-----+-----+-----+----------+
|  aaa|   up|   24|k-123|dummy|         0|
|  bbb| down|   45| i-98|dummy|         1|
|  ccc| down|   54| k-89|dummy|         2|
|  fff|  int|   23| l-34|dummy|         2|
|  xyz|   up|   22| o-89|dummy|         2|
|  www|   up|   89| u-56|dummy|         2|
+-----+-----+-----+-----+-----+----------+

现在,删除累积总和为 0 的所有行。这将删除所有行,直到第一次出现 down

df = df.where(col('cumulative')>=1)

第 3 步: 与上述第 2 步中的操作相同,除了对 up 执行此操作并删除列 cumulative 中的值小于或等于 1 的所有行。这我们将删除第一次出现 up 下方的所有行。

df = df.withColumn('cumulative',when(col('Part2')=='up',1).otherwise(0))
df = df.selectExpr(
    'Part1','Part2','Part3','Part4','Dummy',
    'sum(cumulative) over (order by row_number() over (order by Dummy)) as cumulative'
 )
df = df.where(col('cumulative')<=1).drop('Dummy','cumulative')
df.show()
+-----+-----+-----+-----+
|Part1|Part2|Part3|Part4|
+-----+-----+-----+-----+
|  bbb| down|   45| i-98|
|  ccc| down|   54| k-89|
|  fff|  int|   23| l-34|
|  xyz|   up|   22| o-89|
+-----+-----+-----+-----+

【讨论】:

以上是关于PySpark - 如何根据列中的两个值从数据框中过滤出连续的行块的主要内容,如果未能解决你的问题,请参考以下文章

什么函数允许我根据R中列中的值从数据框中的列中提取数据?

动态填充pyspark数据框中列中的行

如何根据 PySpark 数据框的另一列中的值修改列? F.当边缘情况

pySpark:如何在数据框中的 arrayType 列中获取 structType 中的所有元素名称?

PYSPARK:如何在 pyspark 数据框中找到两列的余弦相似度?

从 pyspark 数据框中的列中提取特定字符串