Pyspark 数据帧,在标志之间迭代,基于组

Posted

技术标签:

【中文标题】Pyspark 数据帧,在标志之间迭代,基于组【英文标题】:Pyspark dataframe, iterate between flags, based on group 【发布时间】:2018-10-07 15:35:49 【问题描述】:

我需要在 pyspark 数据帧上的事件之间创建一个计数器,如下所示:

输入:

+-------+----+------+  
|machine|date|event |
+-------+----+------+  
| M1    |DAY1|     1|
| M1    |DAY2|     0|
| M1    |DAY3|     0|
| M1    |DAY4|     1|
| M1    |DAY5|     0|
+-------+----+------+ 

预期输出:

+-------+----+------+----------------------+  
|machine|date|event |days since last event |
+-------+----+------+----------------------+  
| M1    |DAY1|     1|                     0|
| M1    |DAY2|     0|                     1|
| M1    |DAY3|     0|                     2|
| M1    |DAY4|     1|                     3|
| M1    |DAY5|     0|                     1|
+-------+----+------+----------------------+ 

我看到了 Window 函数,但我不知道如何创建一个 if 语句,它会在遇到另一个标志事件后重新启动计数器。

关于我该怎么做的任何想法?

【问题讨论】:

您需要在machine 列上分组还是事件序列与机器无关? @pansen 依赖机器,当机器组结束时,计数器也必须重新启动 【参考方案1】:

在这种情况下,您需要使用多个窗口函数。你可以在下面找到我的解决方案

>>> from pyspark.sql.window import Window
>>> import pyspark.sql.functions as F
>>> 
>>> df = sc.parallelize([
...     ('M1','DAY1',1),
...     ('M1','DAY2',0),
...     ('M1','DAY3',0),
...     ('M1','DAY4',1),
...     ('M1','DAY5',0)
...     ]).toDF(['machine','date','event'])
>>> 
>>> df.show()
+-------+----+-----+
|machine|date|event|
+-------+----+-----+
|     M1|DAY1|    1|
|     M1|DAY2|    0|
|     M1|DAY3|    0|
|     M1|DAY4|    1|
|     M1|DAY5|    0|
+-------+----+-----+

>>> window1 = Window.partitionBy('machine').orderBy('date')
>>> window2 = Window.partitionBy('machine','new_col').orderBy('date')
>>> 
>>> df = df.withColumn('new_col', F.sum(F.lag('event').over(window1)).over(window1))
>>> df = df.withColumn('days_since_last_event', F.when(F.isnull('new_col')==True,0).otherwise(F.rank().over(window2)))
>>> 
>>> df = df.drop('new_col')
>>> 
>>> df.show()
+-------+----+-----+---------------------+                                      
|machine|date|event|days_since_last_event|
+-------+----+-----+---------------------+
|     M1|DAY1|    1|                    0|
|     M1|DAY2|    0|                    1|
|     M1|DAY3|    0|                    2|
|     M1|DAY4|    1|                    3|
|     M1|DAY5|    0|                    1|
+-------+----+-----+---------------------+

【讨论】:

很好的解决方案!我喜欢在这里使用rank 很好的解决方案@Ali,非常感谢!但我仍然无法弄清楚这是如何工作的。你有一些我可以阅读的教程然后了解它是如何在引擎盖下工作的吗?关于它的官方文档和数据块帖子并没有我想要的那么具有指导意义。 @LeandroHumb,下面有一个关于窗口函数的很棒的教程。希望对您的理解有所帮助——jaceklaskowski.gitbooks.io/mastering-spark-sql/…

以上是关于Pyspark 数据帧,在标志之间迭代,基于组的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark:在数据帧的不同组上应用 kmeans

使用复杂的条件逻辑加入 Pyspark 数据帧(可能使用地图代替)

pyspark 从数据帧迭代 N 行到每次执行

为啥在使用 pyspark 加入 Spark 数据帧时出现这些 Py4JJavaError showString 错误?

PySpark:我们应该迭代更新数据帧吗?

Pyspark 错误:“Py4JJavaError:调用 o655.count 时出错。”在数据帧上调用 count() 方法时