Spark:基于列值的行过滤器

Posted

技术标签:

【中文标题】Spark:基于列值的行过滤器【英文标题】:Spark: Row filter based on Column value 【发布时间】:2019-06-26 15:57:35 【问题描述】:

我有数百万行作为这样的数据框:

val df = Seq(("id1", "ACTIVE"), ("id1", "INACTIVE"), ("id1", "INACTIVE"), ("id2", "ACTIVE"), ("id3", "INACTIVE"), ("id3", "INACTIVE")).toDF("id", "status")

scala> df.show(false)
+---+--------+
|id |status  |
+---+--------+
|id1|ACTIVE  |
|id1|INACTIVE|
|id1|INACTIVE|
|id2|ACTIVE  |
|id3|INACTIVE|
|id3|INACTIVE|
+---+--------+

现在我想将这些数据分成三个单独的数据框,如下所示:

    只有 ACTIVE id(比如 id2),比如 activeDF 只有 INACTIVE id(如 id3),比如 inactiveDF 同时拥有 ACTIVE 和 INACTIVE 作为状态,比如说 bothDF

如何计算activeDFinactiveDF

我知道 bothDF 可以像这样计算

df.select("id").distinct.except(activeDF).except(inactiveDF)

,但这将涉及洗牌(因为需要相同的“不同”操作)。有没有更好的方法来计算 bothDF

版本:

Spark : 2.2.1
Scala : 2.11

【问题讨论】:

【参考方案1】:

最优雅的解决方案是以status为中心

val counts = df
  .groupBy("id")
  .pivot("status", Seq("ACTIVE", "INACTIVE"))
  .count

或同等直接agg

val counts = df
  .groupBy("id")
  .agg(
    count(when($"status" === "ACTIVE", true)) as "ACTIVE",
    count(when($"status" === "INACTIVE", true)) as "INACTIVE"
  )

后跟一个简单的CASE ... WHEN:

val result = counts.withColumn(
  "status",
  when($"ACTIVE" === 0, "INACTIVE")
    .when($"inactive" === 0, "ACTIVE")
    .otherwise("BOTH")
)

result.show
+---+------+--------+--------+                                                  
| id|ACTIVE|INACTIVE|  status|
+---+------+--------+--------+
|id3|     0|       2|INACTIVE|
|id1|     1|       2|    BOTH|
|id2|     1|       0|  ACTIVE|
+---+------+--------+--------+

稍后您可以将resultfilters 分开,或使用支持partitionBy (How to split a dataframe into dataframes with same column values?) 的源转储到磁盘。

【讨论】:

谢谢,它有效,但 Gadipally 发布的代码的性能更好【参考方案2】:

只是另一种方式 - groupBy,按集合收集,然后如果集合的大小为 1,则它要么仅处于活动状态,要么仅处于非活动状态,否则两者都

scala> val df = Seq(("id1", "ACTIVE"), ("id1", "INACTIVE"), ("id1", "INACTIVE"), ("id2", "ACTIVE"), ("id3", "INACTIVE"), ("id3", "INACTIVE"), ("id4", "ACTIVE"), ("id5", "ACTIVE"), ("id6", "INACTIVE"), ("id7", "ACTIVE"), ("id7", "INACTIVE")).toDF("id", "status")
df: org.apache.spark.sql.DataFrame = [id: string, status: string]

scala> df.show(false)
+---+--------+
|id |status  |
+---+--------+
|id1|ACTIVE  |
|id1|INACTIVE|
|id1|INACTIVE|
|id2|ACTIVE  |
|id3|INACTIVE|
|id3|INACTIVE|
|id4|ACTIVE  |
|id5|ACTIVE  |
|id6|INACTIVE|
|id7|ACTIVE  |
|id7|INACTIVE|
+---+--------+


scala> val allstatusDF = df.groupBy("id").agg(collect_set("status") as "allstatus")
allstatusDF: org.apache.spark.sql.DataFrame = [id: string, allstatus: array<string>]

scala> allstatusDF.show(false)
+---+------------------+
|id |allstatus         |
+---+------------------+
|id7|[ACTIVE, INACTIVE]|
|id3|[INACTIVE]        |
|id5|[ACTIVE]          |
|id6|[INACTIVE]        |
|id1|[ACTIVE, INACTIVE]|
|id2|[ACTIVE]          |
|id4|[ACTIVE]          |
+---+------------------+


scala> allstatusDF.withColumn("status", when(size($"allstatus") === 1, $"allstatus".getItem(0)).otherwise("BOTH")).show(false)
+---+------------------+--------+
|id |allstatus         |status  |
+---+------------------+--------+
|id7|[ACTIVE, INACTIVE]|BOTH    |
|id3|[INACTIVE]        |INACTIVE|
|id5|[ACTIVE]          |ACTIVE  |
|id6|[INACTIVE]        |INACTIVE|
|id1|[ACTIVE, INACTIVE]|BOTH    |
|id2|[ACTIVE]          |ACTIVE  |
|id4|[ACTIVE]          |ACTIVE  |
+---+------------------+--------+

【讨论】:

感谢@Gadipally。使用一个 ShuffledRowRDD 执行的操作

以上是关于Spark:基于列值的行过滤器的主要内容,如果未能解决你的问题,请参考以下文章

是否可以使用 pyspark 过滤 Spark DataFrames 以返回列值在列表中的所有行?

Spark Dataframe 中的过滤操作

是否可以通过 Spark 中的值的总和来过滤列?

具有列值的空格的材料表数据源过滤器

选择性过滤列值不为空的行 PostgreSQL

基于一对多值的 NSPredicate 过滤器