如何使用 pyspark 设置动态 where 子句
Posted
技术标签:
【中文标题】如何使用 pyspark 设置动态 where 子句【英文标题】:How to set a dynamic where clause using pyspark 【发布时间】:2021-05-11 17:04:44 【问题描述】:我有一个数据集,其中有多个组。我有一个排名列,它递增计数每组的每个条目。这种结构的一个例子如下所示:
+-----------+---------+---------+
| equipment| run_id|run_order|
+-----------+---------+---------+
|1 |430032589| 1|
|1 |430332632| 2|
|1 |430563033| 3|
|1 |430785715| 4|
|1 |431368577| 5|
|1 |431672148| 6|
|2 |435497596| 1|
|1 |435522469| 7|
每个组(设备)都有不同的运行次数。如上所示,设备 1 有 7 个运行,而设备 2 有 1 个运行。我想为每台设备选择第一次和最后一次 n 次运行。选择前 n 次运行很简单:
df.select("equipment", "run_id").distinct().where(df.run_order <= n).orderBy("equipment").show()
不同之处在于查询,因为每一行都相当于一个时间步长,因此每一行都将记录与该时间步长相关联的传感器读数。因此会有很多行具有相同的设备,run_id和run_order,应该保留在最终结果中,而不是聚合。
由于每个设备的运行次数是唯一的,我无法使用 where 子句(我认为)执行等效的选择查询来获取最后 n 次运行:
df.select("equipment", "run_id").distinct().where(df.rank >= total_runs - n).orderBy("equipment").show()
我可以运行 groupBy 以获得每个设备的最高 run_order
+-----------+----------------+
| equipment| max(run_order) |
+-----------+----------------+
|1 | 7|
|2 | 1|
但我不确定是否有办法构建一个像这样工作的动态 where 子句。这样我就可以获得最后 n 次运行(包括每次运行的所有时间步长数据)。
【问题讨论】:
【参考方案1】:您可以为每个设备添加最高等级的列,并根据该列进行过滤:
from pyspark.sql import functions as F, Window
n = 3
df2 = df.withColumn(
'max_run',
F.max('run_order').over(Window.partitionBy('equipment'))
).where(F.col('run_order') >= F.col('max_run') - n)
【讨论】:
以上是关于如何使用 pyspark 设置动态 where 子句的主要内容,如果未能解决你的问题,请参考以下文章
如何使用带有 PySpark 的 WHERE 子句在 AWS Glue 中查询 JDBC 数据库?
在 SQL 表上使用 pyspark 编写 where 查询
导入pyspark ETL模块并使用python子进程作为子进程运行时出错