pyspark中同一列的多个AND条件没有连接操作

Posted

技术标签:

【中文标题】pyspark中同一列的多个AND条件没有连接操作【英文标题】:Multiple AND conditions on the same column in pyspark without join operation 【发布时间】:2019-02-28 19:15:29 【问题描述】:

我有一个三列表格[s,p,o]。我想删除行,对于 s 中的每个条目, p 列都不包含 [P625, P36] 值。例如

+----+----+------
|   s|   p|  o  |
+----+----+-----|
| Q31| P36| Q239|
| Q31|P625|   51|
| Q45| P36| Q597|
| Q45|P625|  123|
| Q51|P625|   22|
| Q24|P625|   56|

最终结果应该是

+----+----+------
|   s|   p|  o  |
+----+----+-----|
| Q31| P36| Q239|
| Q31|P625|   51|
| Q45| P36| Q597|
| Q45|P625|  123|

使用join操作,上面的任务很简单。

df.filter(df.p=='P625').join(df.filter(df.p=='P36'),'s')

但是有没有更优雅的方法来做到这一点?

【问题讨论】:

我没有用过这个技术,但是你可以看看它是否允许你做 df.filter(df.p=='P265' || df.p=='P36')你可能会在这里找到一些东西:***.com/a/35882046/2793683 @pault 和 dmoore1181 两个建议的查询都不会删除给定示例中原始表的最后三行。 不是重复的,我不是针对同一行的不同列的 @user1848018 我明白你现在的意思了。我认为你必须在这种情况下加入。可能还有其他方法,但连接可能是最有效的。 join() 也不会以您的最终结果示例结束。我认为您可能必须将po 转换为单个列struct(),然后执行.groupBy().agg().filter(),然后执行.flatMap() 以获得最终结果示例. 【参考方案1】:

你需要一个窗口

from pyspark.sql import Window
from pyspark.sql.functions import *

winSpec = Window.partitionBy('s')
df.withColumn("s_list", collect_list("s").over(winSpec)).
filter(array_contains(col("s_list"), "P625") & array_contains(col("s_list"), "P36") & size(col("s_list")) = 2)

【讨论】:

【参考方案2】:

请原谅,因为我对 Scala API 更熟悉,但也许您可以轻松转换它:

scala> val df = spark.createDataset(Seq(
     |      ("Q31", "P36", "Q239"),
     |      ("Q31", "P625", "51"),
     |      ("Q45", "P36", "Q597"),
     |      ("Q45", "P625", "123"),
     |      ("Q51", "P625", "22"),
     |      ("Q24", "P625", "56")
     | )).toDF("s", "p", "o")
df: org.apache.spark.sql.DataFrame = [s: string, p: string ... 1 more field]

scala> (df.select($"s", struct($"p", $"o").as("po"))
     |   .groupBy("s")
     |   .agg(collect_list($"po").as("polist"))
     |   .as[(String, Array[(String, String)])]
     |   .flatMap(r => 
     |     val ps = r._2.map(_._1).toSet
     |           if(ps("P625") && ps("P36")) 
     |             r._2.flatMap(po => Some(r._1, po._1, po._2))
     |            else 
     |             None
     |           
     |   ).toDF("s", "p", "o")
     |   .show())
+---+----+----+                                                                 
|  s|   p|   o|
+---+----+----+
|Q31| P36|Q239|
|Q31|P625|  51|
|Q45| P36|Q597|
|Q45|P625| 123|
+---+----+----+

作为参考,您上面的 join() 命令会返回:

scala> df.filter($"p" === "P625").join(df.filter($"p" === "P36"), "s").show
+---+----+---+---+----+
|  s|   p|  o|  p|   o|
+---+----+---+---+----+
|Q31|P625| 51|P36|Q239|
|Q45|P625|123|P36|Q597|
+---+----+---+---+----+

这也可以用于您的最终解决方案,也许代码更少,但我不确定哪种方法更有效,因为这在很大程度上取决于数据。

【讨论】:

以上是关于pyspark中同一列的多个AND条件没有连接操作的主要内容,如果未能解决你的问题,请参考以下文章

如何在pyspark和sql的一个数据框中应用多个条件并附加到同一个表

如何在 pyspark.sql.functions.when() 中使用多个条件?

在pyspark中加入2个表,多个条件,左连接?

使用pyspark中的条件创建具有运行总量的列

Pyspark - 获取具有条件的列的累积总和

多列上的pyspark条件并返回新列