Pyspark 数据帧,在标志之间迭代,基于组
Posted
技术标签:
【中文标题】Pyspark 数据帧,在标志之间迭代,基于组【英文标题】:Pyspark dataframe, iterate between flags, based on group 【发布时间】:2018-10-07 15:35:49 【问题描述】:我需要在 pyspark 数据帧上的事件之间创建一个计数器,如下所示:
输入:
+-------+----+------+
|machine|date|event |
+-------+----+------+
| M1 |DAY1| 1|
| M1 |DAY2| 0|
| M1 |DAY3| 0|
| M1 |DAY4| 1|
| M1 |DAY5| 0|
+-------+----+------+
预期输出:
+-------+----+------+----------------------+
|machine|date|event |days since last event |
+-------+----+------+----------------------+
| M1 |DAY1| 1| 0|
| M1 |DAY2| 0| 1|
| M1 |DAY3| 0| 2|
| M1 |DAY4| 1| 3|
| M1 |DAY5| 0| 1|
+-------+----+------+----------------------+
我看到了 Window 函数,但我不知道如何创建一个 if 语句,它会在遇到另一个标志事件后重新启动计数器。
关于我该怎么做的任何想法?
【问题讨论】:
您需要在machine
列上分组还是事件序列与机器无关?
@pansen 依赖机器,当机器组结束时,计数器也必须重新启动
【参考方案1】:
在这种情况下,您需要使用多个窗口函数。你可以在下面找到我的解决方案
>>> from pyspark.sql.window import Window
>>> import pyspark.sql.functions as F
>>>
>>> df = sc.parallelize([
... ('M1','DAY1',1),
... ('M1','DAY2',0),
... ('M1','DAY3',0),
... ('M1','DAY4',1),
... ('M1','DAY5',0)
... ]).toDF(['machine','date','event'])
>>>
>>> df.show()
+-------+----+-----+
|machine|date|event|
+-------+----+-----+
| M1|DAY1| 1|
| M1|DAY2| 0|
| M1|DAY3| 0|
| M1|DAY4| 1|
| M1|DAY5| 0|
+-------+----+-----+
>>> window1 = Window.partitionBy('machine').orderBy('date')
>>> window2 = Window.partitionBy('machine','new_col').orderBy('date')
>>>
>>> df = df.withColumn('new_col', F.sum(F.lag('event').over(window1)).over(window1))
>>> df = df.withColumn('days_since_last_event', F.when(F.isnull('new_col')==True,0).otherwise(F.rank().over(window2)))
>>>
>>> df = df.drop('new_col')
>>>
>>> df.show()
+-------+----+-----+---------------------+
|machine|date|event|days_since_last_event|
+-------+----+-----+---------------------+
| M1|DAY1| 1| 0|
| M1|DAY2| 0| 1|
| M1|DAY3| 0| 2|
| M1|DAY4| 1| 3|
| M1|DAY5| 0| 1|
+-------+----+-----+---------------------+
【讨论】:
很好的解决方案!我喜欢在这里使用rank
。
很好的解决方案@Ali,非常感谢!但我仍然无法弄清楚这是如何工作的。你有一些我可以阅读的教程然后了解它是如何在引擎盖下工作的吗?关于它的官方文档和数据块帖子并没有我想要的那么具有指导意义。
@LeandroHumb,下面有一个关于窗口函数的很棒的教程。希望对您的理解有所帮助——jaceklaskowski.gitbooks.io/mastering-spark-sql/…以上是关于Pyspark 数据帧,在标志之间迭代,基于组的主要内容,如果未能解决你的问题,请参考以下文章
使用复杂的条件逻辑加入 Pyspark 数据帧(可能使用地图代替)
为啥在使用 pyspark 加入 Spark 数据帧时出现这些 Py4JJavaError showString 错误?
Pyspark 错误:“Py4JJavaError:调用 o655.count 时出错。”在数据帧上调用 count() 方法时