pyspark 按相应条件过滤行

Posted

技术标签:

【中文标题】pyspark 按相应条件过滤行【英文标题】:pyspark filtering rows by corresponding condition 【发布时间】:2021-11-11 17:12:02 【问题描述】:

假设我有两张桌子:

df_1:
| condition | date           |
| --------  | -------------- |
| A         | 2018-01-01     |
| A         | 2018-01-02     |
| A         | 2018-01-03     |
| B         | 2018-04-04     |
| B         | 2018-04-05     |
| B         | 2018-04-06     |

df_2: 
| condition | date           |
| --------  | -------------- |
| A         | 2018-01-01     |
| B         | 2018-04-05     |

我想按表 2 中的日期过滤表 1,这样我只保留 df_1 的条目,即日期大于其在 df_2 中的相应日期,这是预期的输出:

| condition | date           |
| --------  | -------------- |
| A         | 2018-01-02     |
| A         | 2018-01-03     |
| B         | 2018-04-06     |

在 pandas 中执行此操作的一种方法是遍历 df_2 中的行

all_dfs=[]
for idx,row in df_2.iterrows():
    filtered_df = df_1[(df_1['condition']==row['condition'])&(df_1['date']>row['date'])]
    all_dfs.append(filtered_df)
final_df = pd.concat(all_dfs, axis=0)

如何在不涉及 for 循环的 pyspark 中执行此操作?

【问题讨论】:

使用左反连接:df_1.join(df_2, (df_1['condition'] == df_2['condition']) & (df_1['date'] <= df_2['date']), "left_anti") anti join 将失败是df_1 有行`C | 2018-01-01. Basically when the condition is not present in df_2` 将包含在结果中。 left_semi 与 OP 的 pandas 代码行为相同 【参考方案1】:

Spark 有 left_semi 加入这个用例。

示例

from pyspark.sql.types import *
from pyspark.sql import Row
from datetime import datetime


schema = StructType([StructField('condition', StringType()), StructField('date',DateType())])
df_1_rows = [Row("A", datetime.strptime("2018-01-01", "%Y-%m-%d")),Row("A", datetime.strptime("2018-01-02", "%Y-%m-%d")),Row("A", datetime.strptime("2018-01-03", "%Y-%m-%d")),Row("B", datetime.strptime("2018-04-04", "%Y-%m-%d")),Row("B", datetime.strptime("2018-04-05", "%Y-%m-%d")),Row("B", datetime.strptime("2018-04-06", "%Y-%m-%d")),]
df_1 = spark.createDataFrame(df_1_rows, schema)

df_2_rows = [Row("A", datetime.strptime("2018-01-01", "%Y-%m-%d")),Row("B", datetime.strptime("2018-04-05", "%Y-%m-%d"))]
df_2 = spark.createDataFrame(df_2_rows, schema)

df_1.join(df_2, (df_1['condition'] == df_2['condition']) & (df_1['date'] > df_2['date']), "left_semi").show()

输出

+---------+----------+
|condition|      date|
+---------+----------+
|        A|2018-01-02|
|        A|2018-01-03|
|        B|2018-04-06|
+---------+----------+

【讨论】:

以上是关于pyspark 按相应条件过滤行的主要内容,如果未能解决你的问题,请参考以下文章

按逻辑条件过滤 data.frame 行

Pyspark:根据多个条件过滤数据帧

Redshift:获取行的排名,按条件过滤

如何根据pyspark中的行和列条件过滤多行

Pyspark:过滤多列上的行

Pandas Pivot Table:按条件过滤时出错