在 Pyspark 中,如何在 partitionBy 和 orderBy 之后进行分组?

Posted

技术标签:

【中文标题】在 Pyspark 中,如何在 partitionBy 和 orderBy 之后进行分组?【英文标题】:In Pyspark, how to group after a partitionBy and orderBy? 【发布时间】:2018-10-15 15:34:16 【问题描述】:

我有一个包含事件的 Spark 数据帧 (Pyspark 2.2.0),每个事件都有一个时间戳。还有一个包含一系列标签(A、B、C 或 Null)的附加列。我想为每一行计算 - 按事件组,按时间戳排序 - 当前最长的非 Null 标签更改的计数(Null 应将此计数重置为 0)。带有我理想计算列的 df 示例,称为stretch:

event timestamp   tag    stretch
G1    09:59:00    Null   0
G1    10:00:00    A      1  ---> first non Null tag starts the count
G1    10:01:00    A      1  ---> no change of tag
G1    10:02:00    B      2  ---> change of tag (A to B)
G1    10:03:00    A      3  ---> change of tag (B to A)
G1    10:04:00    Null   0  ---> Null resets the count
G1    10:05:00    A      1  ---> first non Null tag restarts the count

G2    10:00:00    B      1  ---> first non Null tag starts the count
G2    10:01:00    C      2  ---> change of tag (B to C)

在 Pyspark 中我可以这样定义一个窗口:

window = Window.partitionBy("event").orderBy(col("timestamp").asc())

并计算例如标签的变化:

df = df.withColumn("change_of_tag",col("tag")!=lag("tag",1).over(window))

但我找不到如何计算每次遇到 Null 标记时都会重置的这些更改的累积总和。我怀疑我应该定义一个按事件和标签类型(Null 或非 Null)分区的新窗口,但我不知道如何按事件分区,按时间戳排序,然后按标签类型分组。

【问题讨论】:

您是否尝试过使用 .groupBy 并基于 aggs 求和? 你能添加预期的结果吗? @AliYesilli 预期结果已经存在,是拉伸列,如果不清楚,请见谅。 @Prazy 问题是我不知道如何分组 after 一个 orderby (我必须按事件分组,按时间戳排序,然后按非 null 分组/空标签)。 【参考方案1】:

我认为这是一个非常棘手的案例。特别是“标签不变”的情况很难在一个过程中处理。所以你可以在下面找到我的解决方案。我必须创建一些新的计算列来获得结果

>>> import pyspark.sql.functions as F
>>> from pyspark.sql.window import Window
>>> 
>>> df.show()
+-----+---------+----+
|event|timestamp| tag|
+-----+---------+----+
|   G1| 09:59:00|null|
|   G1| 10:00:00|   A|
|   G1| 10:01:00|   A|
|   G1| 10:02:00|   B|
|   G1| 10:03:00|   A|
|   G1| 10:04:00|null|
|   G1| 10:05:00|   A|
|   G2| 10:00:00|   B|
|   G2| 10:01:00|   C|
+-----+---------+----+

>>> df = df.withColumn('new_col1', F.when(F.isnull('tag'),1).otherwise(0))
>>> 
>>> window1 = Window.partitionBy('event').orderBy('timestamp')
>>> 
>>> df = df.withColumn('new_col2', F.row_number().over(window1))
>>> df = df.withColumn('new_col3', F.lag('tag').over(window1))
>>> df = df.withColumn('new_col4', F.lag('new_col2').over(window1))
>>> df = df.withColumn('new_col4', F.when(df['new_col3']==df['tag'],df['new_col4']).otherwise(df['new_col2']))
>>> df = df.withColumn('new_col5', F.sum('new_col1').over(window1))
>>> df = df.withColumn('new_col5', F.when(F.isnull('tag'),None).otherwise(df['new_col5']))
>>> 
>>> window2 = Window.partitionBy('event','new_col5').orderBy('new_col4')
>>> 
>>> df = df.withColumn('new_col6', F.when(F.isnull('tag'),0).otherwise(F.dense_rank().over(window2)))
>>> df = df.select('event','timestamp','tag', df['new_col6'].alias('stretch'))
>>> 
>>> df.sort(["event", "timestamp"], ascending=[1, 1]).show()
+-----+---------+----+-------+                                                  
|event|timestamp| tag|stretch|
+-----+---------+----+-------+
|   G1| 09:59:00|null|      0|
|   G1| 10:00:00|   A|      1|
|   G1| 10:01:00|   A|      1|
|   G1| 10:02:00|   B|      2|
|   G1| 10:03:00|   A|      3|
|   G1| 10:04:00|null|      0|
|   G1| 10:05:00|   A|      1|
|   G2| 10:00:00|   B|      1|
|   G2| 10:01:00|   C|      2|
+-----+---------+----+-------+

【讨论】:

令人印象深刻!我正在开发一个类似的版本(使用 row_number 等)。我错过了 dense_rank fct。我今天将尝试您的代码,看看它是否适用于我的所有案例(如果一切正常,我不会忘记接受您的回答)。非常感谢。 不幸的是,它不适用于我的数据集。我正在尝试查看错误在哪里。我的数据: df = spark.createDataFrame([\ ("G1", 114, "15"),("G1", 115, "13"),("G1", 116, "13" ),("G1", 117, "13"),("G1", 118, "13"),("G1", 119, "13"),\ ("G1", 120, "13") ,("G1", 121, "13"),("G1", 122, "13"),("G1", 123, "13"),("G1", 124, "13"),( "G1", 125, "13"),\ ("G1", 126, "53"),("G1", 127, "53"),("G1", 128, "-1"),( "G1", 129, "-1"),("G1", 130, "-1"),("G1", 131, "-1"),\ ("G1", 132, "37") ,("G1", 133, "50")],["event","timestamp","tag"]) df = df.withColumn("tag",when(col("tag")=="- 1",lit(None)).otherwise(col("tag"))) 错误是标签重复多次(例如:A、A、A)第 4 列错误。我正在努力解决这个问题。【参考方案2】:

代码修改和修复:

df = spark.createDataFrame([\
        ("G1", 113, "-1"),("G1", 114, "A"),("G1", 115, "A"),("G1", 116, "A"),\ 
        ("G1", 117, "B"),("G1", 118, "A"),("G1", 119, "-1"),\
        ("G1", 120, "A"),("G2", 121, "B"),("G2", 122, "C")],["event","timestamp","tag"])

df = df.withColumn("tag",when(col("tag")=="-1",lit(None)).otherwise(col("tag")))

window_trip = Window.partitionBy('event').orderBy('timestamp')

df = df.withColumn('in_out', when(\
        (row_number().over(window_trip)>1) & 
(  ( (col('tag').isNull()) &     (lag('tag').over(window_trip).isNotNull())) \
        | ( (col('tag').isNotNull()) &  (lag('tag').over(window_trip).isNull()) \
        ) \
    ),1) \
    .otherwise(0))
df = df.withColumn('group', sum('in_out').over(window_trip))
df = df.withColumn('tag_change', ((( (col('tag')!=lag('tag').over(window_trip)) ) | (row_number().over(window_trip)==1))).cast("int")  )
df = df.withColumn('tag_rank', sum('tag_change').over(window_trip) )
window2 = Window.partitionBy('event','group').orderBy('tag_rank')
df = df.withColumn('stretch', when(col('tag').isNull(),0).otherwise(dense_rank().over(window2)))
df.sort(["event", "timestamp"], ascending=[1, 1]).show()

再次感谢@AliYesilli,您给了我提示和dense_rank fct!

【讨论】:

以上是关于在 Pyspark 中,如何在 partitionBy 和 orderBy 之后进行分组?的主要内容,如果未能解决你的问题,请参考以下文章

[Pyspark]RDD常用方法总结

在 pyspark 中过滤 Hive 分区表

熊猫平行适用于考拉(pyspark)

PySpark|RDD编程基础

Pyspark 无法在 hive 中创建镶木地板表

如何在分区之间均匀分布值? (“反分区”)