Spark SQL 二次过滤和分组

Posted

技术标签:

【中文标题】Spark SQL 二次过滤和分组【英文标题】:Spark SQL secondary filtering and grouping 【发布时间】:2015-12-18 21:49:51 【问题描述】:

问题:我有一个数据集 A filed1, field2, field3...,我想先将 A 分组为 field1,然后在每个结果中组,我想做一堆子查询,例如,计算具有field2 == true 的行数,或者计算具有field4 == "some_value"field5 == false 的不同field3 的数量等。

我能想到的一些替代方案:我可以编写一个自定义的用户定义聚合函数,该函数接受一个计算过滤条件的函数,但这样我必须为每个查询条件。我也看过countDistinct函数可以实现一些操作,但是不知道怎么用它来实现filter-distinct-count语义。

在 Pig 中,我可以做到:

FOREACH (GROUP A by field1) 
        field_a = FILTER A by field2 == TRUE;
        field_b = FILTER A by field4 == 'some_value' AND field5 == FALSE;
        field_c = DISTINCT field_b.field3;

        GENERATE  FLATTEN(group),
                  COUNT(field_a) as fa,
                  COUNT(field_b) as fb,
                  COUNT(field_c) as fc,

有没有办法在 Spark SQL 中做到这一点?

【问题讨论】:

【参考方案1】:

排除不同的计数可以通过简单的条件求和来解决:

import org.apache.spark.sql.functions.sum

val df = sc.parallelize(Seq(
  (1L, true, "x", "foo", true), (1L, true, "y", "bar", false), 
  (1L, true, "z", "foo", true), (2L, false, "y", "bar", false), 
  (2L, true, "x", "foo", false)
)).toDF("field1", "field2", "field3", "field4", "field5")

val left = df.groupBy($"field1").agg(
  sum($"field2".cast("int")).alias("fa"),
  sum(($"field4" === "foo" && ! $"field5").cast("int")).alias("fb")
)
left.show

// +------+---+---+
// |field1| fa| fb|
// +------+---+---+
// |     1|  3|  0|
// |     2|  1|  1|
// +------+---+---+

不幸的是要棘手得多。 Spark SQL doesn't physically group data 中的 GROUP BY 子句。更不用说寻找不同的元素是相当昂贵的。您可以做的最好的事情可能是分别计算不同的计数并简单地加入结果:

val right = df.where($"field4" === "foo" && ! $"field5")
  .select($"field1".alias("field1_"), $"field3")
  .distinct
  .groupBy($"field1_")
  .agg(count("*").alias("fc"))

val joined = left
  .join(right, $"field1" === $"field1_", "leftouter")
  .na.fill(0)

使用 UDAF 计算每个条件的不同值绝对是一种选择,但有效的实现将相当棘手。从内部表示转换相当昂贵,使用集合存储实现快速 UDAF 也不便宜。如果您可以接受近似解决方案,则可以在那里使用布隆过滤器。

【讨论】:

经过一番谷歌搜索后,我发现您可以避免使用 case when then 进行连接,因此在您的示例中,SELECT field1, SUM(CAST(field2 AS INT)) AS fa, SUM(CAST(field4 = "foo" AND NOT field5 AS INT)) AS fb, COUNT(DISTINCT(CASE WHEN field4 = "foo" AND NOT field5 THEN field3 END)) AS fc FROM df GROUP BY field1 产生相同的最终结果,而且应该更高效

以上是关于Spark SQL 二次过滤和分组的主要内容,如果未能解决你的问题,请参考以下文章

如何在spark中分组后根据值进行过滤

Spark SQL 中分组依据和窗口函数如何交互?

Spark sql分组和总和更改列名?

SQL - 在两个日期之间分组和过滤

Spark Python/SQL - 如何使用独特的组合进行分组

Apache Spark SQL 按范围分组数据