PySpark 中是不是有 .any() 等价物?
Posted
技术标签:
【中文标题】PySpark 中是不是有 .any() 等价物?【英文标题】:Is there a .any() equivalent in PySpark?PySpark 中是否有 .any() 等价物? 【发布时间】:2021-03-09 17:19:50 【问题描述】:我想知道是否有办法在 Pyspark 中使用.any()
?
我在 Python 中有以下代码,它实质上是在子集数据帧中搜索感兴趣的特定列,如果这些列中的任何一个包含 "AD"
,我们不想处理它们。
这是 Python 中的代码:
index_list = [
df.query("id == @id").index
for trial in unique_trial_id_list
if ~(df.query("id == @trial")["unit"].str.upper().str.contains("AD").any()]
这是 Pandas 中的示例数据框。
ID=1
具有与之关联的字符串'AD'
,因此我们希望将其排除在处理之外。但是,ID=2
没有与之关联的字符串 'AD'
,因此我们希望将其包含在进一步的处理中。
data = [
[1, "AD"],
[1, "BC"],
[1, "DE"],
[1, "FG"],
[2, "XY"],
[2, "BC"],
[2, "DE"],
[2, "FG"],
]
df = pd.DataFrame(data, columns=["ID", "Code"])
df
问题是我不知道如何在 PySpark 中执行此等效功能。我已经能够对子集进行列表理解,并且能够使用 contains('AD')
进行子集化,但是当涉及到 any
部分时,我被卡住了。
我想出的 PySpark 代码:
id = id_list[0]
test = sdf.select(["ID", "Codes"]).filter(spark_fns.col("ID") == id).filter(~spark_fns.col("Codes").str.contains("AD"))
【问题讨论】:
【参考方案1】:您可以使用窗口函数(如果至少有一个真值,则布尔值的最大值为真):
from pyspark.sql import functions as F, Window
df1 = df.withColumn(
"to_exclude",
~F.max(F.when(F.col("Code") == "AD", True).otherwise(False)).over(Window.partitionBy("ID"))
).filter(
F.col("to_exclude")
).drop("to_exclude")
df1.show()
# +---+----+
# | ID|Code|
# +---+----+
# | 2| XY|
# | 2| BC|
# | 2| DE|
# | 2| FG|
# +---+----+
或groupby id
并使用max
函数和when
表达式过滤Code
列中包含AD
的id,然后与原始df 加入:
from pyspark.sql import functions as F
filter_df = df.groupBy("id").agg(
F.max(F.when(F.col("Code") == "AD", True).otherwise(False)).alias("to_exclude")
).filter(F.col("to_exclude"))
df1 = df.join(filter_df, ["id"], "left_anti")
在Spark 3+中,还有一个函数any
:
from pyspark.sql import functions as F
filter_df = df.groupBy("id").agg(
F.expr("any(Code = 'AD')").alias("to_exclude")
).filter(F.col("to_exclude"))
df1 = df.join(filter_df, ["id"], "left_anti")
【讨论】:
【参考方案2】:您也可以试试这个 - 1) 查找 Code 列包含 AD 的所有 ID 值,2) 通过左反连接过滤掉具有此类 ID 的行
df = pd.DataFrame(data, columns=["ID", "Code"])
df_psp = spark.createDataFrame(df)
cols = ["ID"]
df_filter = df_psp.filter(F.col("Code").like('%AD%')).select(cols).distinct()
df_out = df_psp.join(df_filter, cols, "left_anti")
【讨论】:
以上是关于PySpark 中是不是有 .any() 等价物?的主要内容,如果未能解决你的问题,请参考以下文章
PySpark 中 pandas.cut() 的等价物是啥?