pyspark中同一列的多个AND条件没有连接操作
Posted
技术标签:
【中文标题】pyspark中同一列的多个AND条件没有连接操作【英文标题】:Multiple AND conditions on the same column in pyspark without join operation 【发布时间】:2019-02-28 19:15:29 【问题描述】:我有一个三列表格[s,p,o]
。我想删除行,对于 s 中的每个条目, p 列都不包含 [P625, P36]
值。例如
+----+----+------
| s| p| o |
+----+----+-----|
| Q31| P36| Q239|
| Q31|P625| 51|
| Q45| P36| Q597|
| Q45|P625| 123|
| Q51|P625| 22|
| Q24|P625| 56|
最终结果应该是
+----+----+------
| s| p| o |
+----+----+-----|
| Q31| P36| Q239|
| Q31|P625| 51|
| Q45| P36| Q597|
| Q45|P625| 123|
使用join操作,上面的任务很简单。
df.filter(df.p=='P625').join(df.filter(df.p=='P36'),'s')
但是有没有更优雅的方法来做到这一点?
【问题讨论】:
我没有用过这个技术,但是你可以看看它是否允许你做 df.filter(df.p=='P265' || df.p=='P36')你可能会在这里找到一些东西:***.com/a/35882046/2793683 @pault 和 dmoore1181 两个建议的查询都不会删除给定示例中原始表的最后三行。 不是重复的,我不是针对同一行的不同列的 @user1848018 我明白你现在的意思了。我认为你必须在这种情况下加入。可能还有其他方法,但连接可能是最有效的。join()
也不会以您的最终结果示例结束。我认为您可能必须将p
和o
转换为单个列struct()
,然后执行.groupBy()
、.agg()
、.filter()
,然后执行.flatMap()
以获得最终结果示例.
【参考方案1】:
你需要一个窗口
from pyspark.sql import Window
from pyspark.sql.functions import *
winSpec = Window.partitionBy('s')
df.withColumn("s_list", collect_list("s").over(winSpec)).
filter(array_contains(col("s_list"), "P625") & array_contains(col("s_list"), "P36") & size(col("s_list")) = 2)
【讨论】:
【参考方案2】:请原谅,因为我对 Scala API 更熟悉,但也许您可以轻松转换它:
scala> val df = spark.createDataset(Seq(
| ("Q31", "P36", "Q239"),
| ("Q31", "P625", "51"),
| ("Q45", "P36", "Q597"),
| ("Q45", "P625", "123"),
| ("Q51", "P625", "22"),
| ("Q24", "P625", "56")
| )).toDF("s", "p", "o")
df: org.apache.spark.sql.DataFrame = [s: string, p: string ... 1 more field]
scala> (df.select($"s", struct($"p", $"o").as("po"))
| .groupBy("s")
| .agg(collect_list($"po").as("polist"))
| .as[(String, Array[(String, String)])]
| .flatMap(r =>
| val ps = r._2.map(_._1).toSet
| if(ps("P625") && ps("P36"))
| r._2.flatMap(po => Some(r._1, po._1, po._2))
| else
| None
|
| ).toDF("s", "p", "o")
| .show())
+---+----+----+
| s| p| o|
+---+----+----+
|Q31| P36|Q239|
|Q31|P625| 51|
|Q45| P36|Q597|
|Q45|P625| 123|
+---+----+----+
作为参考,您上面的 join()
命令会返回:
scala> df.filter($"p" === "P625").join(df.filter($"p" === "P36"), "s").show
+---+----+---+---+----+
| s| p| o| p| o|
+---+----+---+---+----+
|Q31|P625| 51|P36|Q239|
|Q45|P625|123|P36|Q597|
+---+----+---+---+----+
这也可以用于您的最终解决方案,也许代码更少,但我不确定哪种方法更有效,因为这在很大程度上取决于数据。
【讨论】:
以上是关于pyspark中同一列的多个AND条件没有连接操作的主要内容,如果未能解决你的问题,请参考以下文章
如何在pyspark和sql的一个数据框中应用多个条件并附加到同一个表