如何使用 pyspark 设置动态 where 子句

Posted

技术标签:

【中文标题】如何使用 pyspark 设置动态 where 子句【英文标题】:How to set a dynamic where clause using pyspark 【发布时间】:2021-05-11 17:04:44 【问题描述】:

我有一个数据集,其中有多个组。我有一个排名列,它递增计数每组的每个条目。这种结构的一个例子如下所示:

+-----------+---------+---------+
|  equipment|   run_id|run_order|
+-----------+---------+---------+
|1          |430032589|        1|
|1          |430332632|        2|
|1          |430563033|        3|
|1          |430785715|        4|
|1          |431368577|        5|
|1          |431672148|        6|
|2          |435497596|        1|
|1          |435522469|        7|

每个组(设备)都有不同的运行次数。如上所示,设备 1 有 7 个运行,而设备 2 有 1 个运行。我想为每台设备选择第一次和最后一次 n 次运行。选择前 n 次运行很简单:

df.select("equipment", "run_id").distinct().where(df.run_order <= n).orderBy("equipment").show()

不同之处在于查询,因为每一行都相当于一个时间步长,因此每一行都将记录与该时间步长相关联的传感器读数。因此会有很多行具有相同的设备,run_id和run_order,应该保留在最终结果中,而不是聚合。

由于每个设备的运行次数是唯一的,我无法使用 where 子句(我认为)执行等效的选择查询来获取最后 n 次运行:

df.select("equipment", "run_id").distinct().where(df.rank >= total_runs - n).orderBy("equipment").show()

我可以运行 groupBy 以获得每个设备的最高 run_order

+-----------+----------------+
|  equipment| max(run_order) |
+-----------+----------------+
|1          |               7|
|2          |               1|

但我不确定是否有办法构建一个像这样工作的动态 where 子句。这样我就可以获得最后 n 次运行(包括每次运行的所有时间步长数据)。

【问题讨论】:

【参考方案1】:

您可以为每个设备添加最高等级的列,并根据该列进行过滤:

from pyspark.sql import functions as F, Window

n = 3

df2 = df.withColumn(
    'max_run', 
    F.max('run_order').over(Window.partitionBy('equipment'))
).where(F.col('run_order') >= F.col('max_run') - n)

【讨论】:

以上是关于如何使用 pyspark 设置动态 where 子句的主要内容,如果未能解决你的问题,请参考以下文章

如何使用带有 PySpark 的 WHERE 子句在 AWS Glue 中查询 JDBC 数据库?

在 SQL 表上使用 pyspark 编写 where 查询

导入pyspark ETL模块并使用python子进程作为子进程运行时出错

如何仅使用在不使用动态SQL的情况下检查的复选框,将WHERE子句设置为在多个位列上进行过滤?

如何在 Pyspark 的动态列列表中转义列名

如何访问pyspark数据框中的动态列