pyspark 数据框删除具有较旧时间戳的重复值

Posted

技术标签:

【中文标题】pyspark 数据框删除具有较旧时间戳的重复值【英文标题】:pyspark dataframe drop duplicate values with older time stamp 【发布时间】:2017-09-07 17:45:00 【问题描述】:

我有一个 pyspark 数据框,其中包含 starttime 和 stoptime 列以及其他列的值得到更新

|startime  |stoptime  |hour  |minute  |sec  |sip          |dip            |sport|dport|proto|pkt |byt |
|1504766585|1504801216|16    |20      |16   |192.168.0.11 |23.204.108.58  |51249|80   |6    |0   |0   |
|1504766585|1504801216|16    |20      |16   |192.168.0.11 |23.204.108.58  |51249|80   |6    |0   |0   |
|1504781751|1504801216|16    |20      |16   |192.168.0.11 |23.72.38.96    |51252|80   |6    |0   |0   |
|1504781751|1504801216|16    |20      |16   |192.168.0.11 |23.72.38.96    |51252|80   |6    |0   |0   |
|1504766585|1504801336|16    |22      |16   |192.168.0.11 |23.204.108.58  |51249|80   |6    |0   |0   |
|1504766585|1504801336|16    |22      |16   |192.168.0.11 |23.204.108.58  |51249|80   |6    |0   |0   |
|1504781751|1504801336|16    |22      |16   |192.168.0.11 |23.72.38.96    |51252|80   |6    |0   |0   |
|1504781751|1504801336|16    |22      |16   |192.168.0.11 |23.72.38.96    |51252|80   |6    |0   |0   |

在这个例子中,我想选择所有具有最近停止时间的行,所有其他列值都是重复的。

【问题讨论】:

【参考方案1】:

我猜你想保持每个sport 的最新记录。您应该使用窗口函数来确定每个分区的最新记录:

import pyspark.sql.functions as psf
from pyspark.sql import Window
w = Window.partitionBy("sport").orderBy(psf.desc("stoptime"))

df.withColumn("rn", psf.row_number().over(w)).filter("rn = 1").drop("rn")

    +----------+----------+----+---+---+------------+-------------+-----+-----+-----+---+---+
    |  startime|  stoptime|hour|min|sec|         sip|          dip|sport|dport|proto|pkt|byt|
    +----------+----------+----+---+---+------------+-------------+-----+-----+-----+---+---+
    |1504781751|1504801336|  16| 22| 16|192.168.0.11|  23.72.38.96|51252|   80|    6|  0|  0|
    |1504766585|1504801336|  16| 22| 16|192.168.0.11|23.204.108.58|51249|   80|    6|  0|  0|
    +----------+----------+----+---+---+------------+-------------+-----+-----+-----+---+---+

您最终会得到与sport 不同的分区一样多的记录。

如果您想要整个表的最新stoptime 而不进行分区,您可以删除partitionBy 并改用dense_rank(相同的值将具有相同的等级):

w = Window.orderBy(psf.desc("stoptime"))

df.withColumn("rn", psf.dense_rank().over(w)).filter("rn = 1").drop("rn").show()

    +----------+----------+----+---+---+------------+-------------+-----+-----+-----+---+---+
    |  startime|  stoptime|hour|min|sec|         sip|          dip|sport|dport|proto|pkt|byt|
    +----------+----------+----+---+---+------------+-------------+-----+-----+-----+---+---+
    |1504766585|1504801336|  16| 22| 16|192.168.0.11|23.204.108.58|51249|   80|    6|  0|  0|
    |1504766585|1504801336|  16| 22| 16|192.168.0.11|23.204.108.58|51249|   80|    6|  0|  0|
    |1504781751|1504801336|  16| 22| 16|192.168.0.11|  23.72.38.96|51252|   80|    6|  0|  0|
    |1504781751|1504801336|  16| 22| 16|192.168.0.11|  23.72.38.96|51252|   80|    6|  0|  0|
    +----------+----------+----+---+---+------------+-------------+-----+-----+-----+---+---+

【讨论】:

我想要具有相同“开始时间、小时、分钟、秒、sip、dip、运动、dport、proto”但具有最新“停止时间”的行。这反过来会给我最大字节和 pkt 如果您需要每个不同 startime, hour, min, sec, sip, dip, sport, dport, proto 的最大 stoptime 值。你只需要但partitionBy中的所有这些列@ 这看起来对吗? import pyspark.sql.functions as psf from pyspark.sql import Window w = Window.partitionBy("startime","hour","sip","dip","sport","dport","proto").orderBy(psf.desc("stoptime")) df = dataframe.withColumn("rn", psf.row_number().over(w)).filter("rn = 1").drop("rn") df.show() 看起来不错,请检查输出,看看它是否符合您的期望。【参考方案2】:
from pyspark.sql.functions import col

df = sc.parallelize([(1504766585,1504801216,16,20,16,'192.168.0.11','23.204.108.58',51249,80,6,0,0),
                     (1504766585,1504801216,16,20,16,'192.168.0.11','23.204.108.58',51249,80,6,0,0),
                     (1504781751,1504801216,16,20,16,'192.168.0.11','23.72.38.96',  51252,80,6,0,0),
                     (1504781751,1504801216,16,20,16,'192.168.0.11','23.72.38.96',  51252,80,6,0,0),
                     (1504766585,1504801336,16,22,16,'192.168.0.11','23.204.108.58',51249,80,6,0,0),
                     (1504766585,1504801336,16,22,16,'192.168.0.11','23.204.108.58',51249,80,6,0,0),
                     (1504781751,1504801336,16,22,16,'192.168.0.11','23.72.38.96',  51252,80,6,0,0),
                     (1504781751,1504801336,16,22,16,'192.168.0.11','23.72.38.96',  51252,80,6,0,0)]).\
    toDF(["startime","stoptime","hour","min","sec","sip","dip","sport","dport","proto","pkt","byt"])

df1 = df.where(col("stoptime") == df.select("stoptime").rdd.max()[0]).distinct()
df1.show()

输出是

+----------+----------+----+---+---+------------+-------------+-----+-----+-----+---+---+
|  startime|  stoptime|hour|min|sec|         sip|          dip|sport|dport|proto|pkt|byt|
+----------+----------+----+---+---+------------+-------------+-----+-----+-----+---+---+
|1504766585|1504801336|  16| 22| 16|192.168.0.11|23.204.108.58|51249|   80|    6|  0|  0|
|1504781751|1504801336|  16| 22| 16|192.168.0.11|  23.72.38.96|51252|   80|    6|  0|  0|
+----------+----------+----+---+---+------------+-------------+-----+-----+-----+---+---+

【讨论】:

刚刚添加的 (Python) 代码高亮 - 在代码开头查找 ... @user2825083 如果它回答了您的查询,您应该将其标记为正确答案,因为如果他们将来遇到类似问题,它将帮助其他人。谢谢! 将 pyspark 值收集到 python 对象中可能会对float 产生意想不到的结果,因为它们的近似方式不同。我不建议这样做。 @Marie 如果您不介意,愿意解释更多吗?我找不到这里使用的任何非火花对象? df.select("stoptime").rdd.max()[0] 是一个 python int

以上是关于pyspark 数据框删除具有较旧时间戳的重复值的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark - 从数据框中删除重复项,保持最后一次出现

从 PySpark 中的数据框中删除重复项

从 PySpark 中的数据框中删除重复项

Pyspark 数据框删除 AWS Glue 脚本中的重复项

在 pyspark 数据框中使用 write.partitionBy 时如何删除重复项?

pyspark:比较给定列的值时从数据框中获取公共数据