如何根据pyspark中的条件组合dataFrame中的行

Posted

技术标签:

【中文标题】如何根据pyspark中的条件组合dataFrame中的行【英文标题】:how to combine rows in a dataFrame based on a condition in pyspark 【发布时间】:2018-08-29 03:33:39 【问题描述】:

我必须为应用程序处理包含日志(进入和退出)的数据框 数据如下:

USER | DATETIME        | IN_OUT
---------------------------------
0002  2018/08/28 12:00   IN

0002  2018/08/28 12:20   OUT

0003  2018/08/28 13:00   IN

0003  2018/08/28 14:20   OUT

0003  2018/08/28 15:00   IN

0003  2018/08/28 16:00   OUT

如何将包含 2 个会话的行合并生成

USER | DATETIMEIN       | DATETIMEOUT        | SESSIONTIME[Minutes]
-------------------------------------------------------
0002    2018/08/28 12:00   2018/08/28 12:20        20

0003    2018/08/28 13:00   2018/08/28 14:30        90

0003    2018/08/28 15:00   2018/08/28 16:00        60

【问题讨论】:

你能分享你的输入数据框的架构和你尝试过的方法吗? 【参考方案1】:

如果你能确保一个 IN 后面总是跟着一个 OUT 事件,你可以使用下面的代码(我包含了一个对 IN 和 OUT 的检查,但如果 IN 和 OUT 不交替,它将不起作用)。

from pyspark.sql.window import Window as W
test_df = spark.createDataFrame([
    (2,datetime.datetime(2018,8,28,12,00), "IN"),(2,datetime.datetime(2018,8,28,12,20), "OUT"),(3,datetime.datetime(2018,8,28,13,00), "IN"),(3,datetime.datetime(2018,8,28,14,20), "OUT"),(3,datetime.datetime(2018,8,28,15,00), "IN"),(3,datetime.datetime(2018,8,28,16,00), "OUT")
    ], ("USER", "DATETIME", "IN_OUT")) # creation of Dataframe

w = W.partitionBy("USER").orderBy("DATETIME") #order by datetime and process every user separately
get_in= when((lag("IN_OUT", 1).over(w) == "IN") & (col("IN_OUT")=="OUT"), lag("DATETIME",1).over(w)).otherwise(None) # apply the window and if the previous event was IN preserve the time

test_df.withColumn("DATETIMEIN",get_in.cast("timestamp")).withColumn("DATETIMEOUT",col("DATETIME")).filter((col("DATETIMEIN").isNotNull())).withColumn("SESSIONTIME[Minutes]",(col("DATETIME").cast("long")-col("DATETIMEIN").cast("long"))/60).select("USER","DATETIMEIN", "DATETIMEOUT", "SESSIONTIME[Minutes]").show() #apply the function and compute the difference to previous IN_TIME

结果:

+----+-------------------+-------------------+--------------------+
|USER|         DATETIMEIN|        DATETIMEOUT|SESSIONTIME[Minutes]|
+----+-------------------+-------------------+--------------------+
|   3|2018-08-28 13:00:00|2018-08-28 14:20:00|                80.0|
|   3|2018-08-28 15:00:00|2018-08-28 16:00:00|                60.0|
|   2|2018-08-28 12:00:00|2018-08-28 12:20:00|                20.0|
+----+-------------------+-------------------+--------------------+

【讨论】:

以上是关于如何根据pyspark中的条件组合dataFrame中的行的主要内容,如果未能解决你的问题,请参考以下文章

在 PySpark 中,如何根据另一个 DataFrame 中的查找来填充新列?

检查 Pyspark Dataframe 中的重复项

Pyspark Dataframe Imputations - 根据指定条件用列平均值替换未知和缺失值

PySpark DataFrame 根据另一列中时间戳值的最小/最大条件更新列值

如何在 Pyspark Dataframe 中创建多列的所有成对组合?

Pyspark:如何在不同条件的数据框中创建列