PySpark 中是不是有 .any() 等价物?

Posted

技术标签:

【中文标题】PySpark 中是不是有 .any() 等价物?【英文标题】:Is there a .any() equivalent in PySpark?PySpark 中是否有 .any() 等价物? 【发布时间】:2021-03-09 17:19:50 【问题描述】:

我想知道是否有办法在 Pyspark 中使用.any()

我在 Python 中有以下代码,它实质上是在子集数据帧中搜索感兴趣的特定列,如果这些列中的任何一个包含 "AD",我们不想处理它们。

这是 Python 中的代码:

index_list = [
    df.query("id == @id").index 
    for trial in unique_trial_id_list 
    if ~(df.query("id == @trial")["unit"].str.upper().str.contains("AD").any()]

这是 Pandas 中的示例数据框。

ID=1 具有与之关联的字符串'AD',因此我们希望将其排除在处理之外。但是,ID=2 没有与之关联的字符串 'AD',因此我们希望将其包含在进一步的处理中。

data = [
    [1, "AD"],
    [1, "BC"],
    [1, "DE"],
    [1, "FG"],
    [2, "XY"],
    [2, "BC"],
    [2, "DE"],
    [2, "FG"],
]
df = pd.DataFrame(data, columns=["ID", "Code"])
df

问题是我不知道如何在 PySpark 中执行此等效功能。我已经能够对子集进行列表理解,并且能够使用 contains('AD') 进行子集化,但是当涉及到 any 部分时,我被卡住了。

我想出的 PySpark 代码:

id = id_list[0] 
test = sdf.select(["ID", "Codes"]).filter(spark_fns.col("ID") == id).filter(~spark_fns.col("Codes").str.contains("AD"))

【问题讨论】:

【参考方案1】:

您可以使用窗口函数(如果至少有一个真值,则布尔值的最大值为真):

from pyspark.sql import functions as F, Window

df1 = df.withColumn(
    "to_exclude",
    ~F.max(F.when(F.col("Code") == "AD", True).otherwise(False)).over(Window.partitionBy("ID"))
).filter(
    F.col("to_exclude")
).drop("to_exclude")

df1.show()
# +---+----+
# | ID|Code|
# +---+----+
# |  2|  XY|
# |  2|  BC|
# |  2|  DE|
# |  2|  FG|
# +---+----+

或groupby id 并使用max 函数和when 表达式过滤Code 列中包含AD 的id,然后与原始df 加入:

from pyspark.sql import functions as F

filter_df = df.groupBy("id").agg(
    F.max(F.when(F.col("Code") == "AD", True).otherwise(False)).alias("to_exclude")
).filter(F.col("to_exclude"))

df1 = df.join(filter_df, ["id"], "left_anti")

在Spark 3+中,还有一个函数any

from pyspark.sql import functions as F

filter_df = df.groupBy("id").agg(
    F.expr("any(Code = 'AD')").alias("to_exclude")
).filter(F.col("to_exclude"))

df1 = df.join(filter_df, ["id"], "left_anti")

【讨论】:

【参考方案2】:

您也可以试试这个 - 1) 查找 Code 列包含 AD 的所有 ID 值,2) 通过左反连接过滤掉具有此类 ID 的行

df = pd.DataFrame(data, columns=["ID", "Code"])
df_psp = spark.createDataFrame(df)

cols = ["ID"]
df_filter = df_psp.filter(F.col("Code").like('%AD%')).select(cols).distinct()
df_out = df_psp.join(df_filter, cols, "left_anti")

【讨论】:

以上是关于PySpark 中是不是有 .any() 等价物?的主要内容,如果未能解决你的问题,请参考以下文章

JPA 中的 @Any 等价于啥?

pyspark 查询的 SQL 等价物

PySpark 中 pandas.cut() 的等价物是啥?

PySpark AND EXISTS 等价于 sql 查询

在 Pyspark HiveContext 中,SQL OFFSET 的等价物是啥?

PySpark 中的 mkString 等价物是啥?