在 Pyspark 中,如何在 partitionBy 和 orderBy 之后进行分组?
Posted
技术标签:
【中文标题】在 Pyspark 中,如何在 partitionBy 和 orderBy 之后进行分组?【英文标题】:In Pyspark, how to group after a partitionBy and orderBy? 【发布时间】:2018-10-15 15:34:16 【问题描述】:我有一个包含事件的 Spark 数据帧 (Pyspark 2.2.0),每个事件都有一个时间戳。还有一个包含一系列标签(A、B、C 或 Null)的附加列。我想为每一行计算 - 按事件组,按时间戳排序 - 当前最长的非 Null 标签更改的计数(Null 应将此计数重置为 0)。带有我理想计算列的 df 示例,称为stretch:
event timestamp tag stretch
G1 09:59:00 Null 0
G1 10:00:00 A 1 ---> first non Null tag starts the count
G1 10:01:00 A 1 ---> no change of tag
G1 10:02:00 B 2 ---> change of tag (A to B)
G1 10:03:00 A 3 ---> change of tag (B to A)
G1 10:04:00 Null 0 ---> Null resets the count
G1 10:05:00 A 1 ---> first non Null tag restarts the count
G2 10:00:00 B 1 ---> first non Null tag starts the count
G2 10:01:00 C 2 ---> change of tag (B to C)
在 Pyspark 中我可以这样定义一个窗口:
window = Window.partitionBy("event").orderBy(col("timestamp").asc())
并计算例如标签的变化:
df = df.withColumn("change_of_tag",col("tag")!=lag("tag",1).over(window))
但我找不到如何计算每次遇到 Null 标记时都会重置的这些更改的累积总和。我怀疑我应该定义一个按事件和标签类型(Null 或非 Null)分区的新窗口,但我不知道如何按事件分区,按时间戳排序,然后按标签类型分组。
【问题讨论】:
您是否尝试过使用 .groupBy 并基于 aggs 求和? 你能添加预期的结果吗? @AliYesilli 预期结果已经存在,是拉伸列,如果不清楚,请见谅。 @Prazy 问题是我不知道如何分组 after 一个 orderby (我必须按事件分组,按时间戳排序,然后按非 null 分组/空标签)。 【参考方案1】:我认为这是一个非常棘手的案例。特别是“标签不变”的情况很难在一个过程中处理。所以你可以在下面找到我的解决方案。我必须创建一些新的计算列来获得结果
>>> import pyspark.sql.functions as F
>>> from pyspark.sql.window import Window
>>>
>>> df.show()
+-----+---------+----+
|event|timestamp| tag|
+-----+---------+----+
| G1| 09:59:00|null|
| G1| 10:00:00| A|
| G1| 10:01:00| A|
| G1| 10:02:00| B|
| G1| 10:03:00| A|
| G1| 10:04:00|null|
| G1| 10:05:00| A|
| G2| 10:00:00| B|
| G2| 10:01:00| C|
+-----+---------+----+
>>> df = df.withColumn('new_col1', F.when(F.isnull('tag'),1).otherwise(0))
>>>
>>> window1 = Window.partitionBy('event').orderBy('timestamp')
>>>
>>> df = df.withColumn('new_col2', F.row_number().over(window1))
>>> df = df.withColumn('new_col3', F.lag('tag').over(window1))
>>> df = df.withColumn('new_col4', F.lag('new_col2').over(window1))
>>> df = df.withColumn('new_col4', F.when(df['new_col3']==df['tag'],df['new_col4']).otherwise(df['new_col2']))
>>> df = df.withColumn('new_col5', F.sum('new_col1').over(window1))
>>> df = df.withColumn('new_col5', F.when(F.isnull('tag'),None).otherwise(df['new_col5']))
>>>
>>> window2 = Window.partitionBy('event','new_col5').orderBy('new_col4')
>>>
>>> df = df.withColumn('new_col6', F.when(F.isnull('tag'),0).otherwise(F.dense_rank().over(window2)))
>>> df = df.select('event','timestamp','tag', df['new_col6'].alias('stretch'))
>>>
>>> df.sort(["event", "timestamp"], ascending=[1, 1]).show()
+-----+---------+----+-------+
|event|timestamp| tag|stretch|
+-----+---------+----+-------+
| G1| 09:59:00|null| 0|
| G1| 10:00:00| A| 1|
| G1| 10:01:00| A| 1|
| G1| 10:02:00| B| 2|
| G1| 10:03:00| A| 3|
| G1| 10:04:00|null| 0|
| G1| 10:05:00| A| 1|
| G2| 10:00:00| B| 1|
| G2| 10:01:00| C| 2|
+-----+---------+----+-------+
【讨论】:
令人印象深刻!我正在开发一个类似的版本(使用 row_number 等)。我错过了 dense_rank fct。我今天将尝试您的代码,看看它是否适用于我的所有案例(如果一切正常,我不会忘记接受您的回答)。非常感谢。 不幸的是,它不适用于我的数据集。我正在尝试查看错误在哪里。我的数据: df = spark.createDataFrame([\ ("G1", 114, "15"),("G1", 115, "13"),("G1", 116, "13" ),("G1", 117, "13"),("G1", 118, "13"),("G1", 119, "13"),\ ("G1", 120, "13") ,("G1", 121, "13"),("G1", 122, "13"),("G1", 123, "13"),("G1", 124, "13"),( "G1", 125, "13"),\ ("G1", 126, "53"),("G1", 127, "53"),("G1", 128, "-1"),( "G1", 129, "-1"),("G1", 130, "-1"),("G1", 131, "-1"),\ ("G1", 132, "37") ,("G1", 133, "50")],["event","timestamp","tag"]) df = df.withColumn("tag",when(col("tag")=="- 1",lit(None)).otherwise(col("tag"))) 错误是标签重复多次(例如:A、A、A)第 4 列错误。我正在努力解决这个问题。【参考方案2】:代码修改和修复:
df = spark.createDataFrame([\
("G1", 113, "-1"),("G1", 114, "A"),("G1", 115, "A"),("G1", 116, "A"),\
("G1", 117, "B"),("G1", 118, "A"),("G1", 119, "-1"),\
("G1", 120, "A"),("G2", 121, "B"),("G2", 122, "C")],["event","timestamp","tag"])
df = df.withColumn("tag",when(col("tag")=="-1",lit(None)).otherwise(col("tag")))
window_trip = Window.partitionBy('event').orderBy('timestamp')
df = df.withColumn('in_out', when(\
(row_number().over(window_trip)>1) &
( ( (col('tag').isNull()) & (lag('tag').over(window_trip).isNotNull())) \
| ( (col('tag').isNotNull()) & (lag('tag').over(window_trip).isNull()) \
) \
),1) \
.otherwise(0))
df = df.withColumn('group', sum('in_out').over(window_trip))
df = df.withColumn('tag_change', ((( (col('tag')!=lag('tag').over(window_trip)) ) | (row_number().over(window_trip)==1))).cast("int") )
df = df.withColumn('tag_rank', sum('tag_change').over(window_trip) )
window2 = Window.partitionBy('event','group').orderBy('tag_rank')
df = df.withColumn('stretch', when(col('tag').isNull(),0).otherwise(dense_rank().over(window2)))
df.sort(["event", "timestamp"], ascending=[1, 1]).show()
再次感谢@AliYesilli,您给了我提示和dense_rank fct!
【讨论】:
以上是关于在 Pyspark 中,如何在 partitionBy 和 orderBy 之后进行分组?的主要内容,如果未能解决你的问题,请参考以下文章