按重叠列分区时的高效 Spark 数据集操作

Posted

技术标签:

【中文标题】按重叠列分区时的高效 Spark 数据集操作【英文标题】:Efficient spark dataset operations when partitioned by overlapping columns 【发布时间】:2018-03-08 20:40:55 【问题描述】:

我有一个如下所示的数据集(“guid”、“timestamp”、“agt”)

val df = List(Test("a", "1", null),
   Test("b", "2", "4"),
   Test("a", "1", "3"),
   Test("b", "2", "4"),
   Test("c", "1", "3"),
   Test("a", "6", "8"),
   Test("b", "2", "4"),
   Test("a", "1", "4")

我需要计算

按 guid 分组时每行的最小时间戳。 按(guid、时间戳)分组时每个键的计数 行的 agtM 按 guid 分组并按时间戳 (desc) 排序然后取第一个非空 agt else "" 删除重复项

所以输出将如下所示。

+----+---------+---+-------+-----+----+
|guid|timestamp|agt|minimum|count|agtM|
+----+---------+---+-------+-----+----+
|   c|        1|  3|      1|    1|   3|
|   b|        2|  4|      2|    3|   4|
|   a|        1|   |      1|    3|   8|
|   a|        6|  8|      1|    1|   8|
+----+---------+---+-------+-----+----+

我试过了

val w = Window.partitionBy($"guid")

    val w1 = Window.partitionBy($"guid", $"timestamp")
    val w2 = Window.partitionBy($"guid").orderBy($"timestamp".desc).rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

    val gg = df.toDS()
      .withColumn("minimum", min("timestamp").over(w))
      .withColumn("count", count("*").over(w1))
      .withColumn("agtM", coalesce(first($"agt", true).over(w2), lit("")))
      .dropDuplicates("guid", "timestamp")

不过,我对 agtM 计算不是很自信。我的目标是实现最小的改组,因为在这种情况下,我们首先按 guid 分组,然后按(guid,时间戳)分组,从逻辑上讲,第二个分组应该发生在第一个创建的分区中。然后输出按 guid 分组并与另一个表连接。这两个数据都非常庞大(以 TB 为单位),因此希望通过最少的改组来实现这一点,并且不想稍后在 mapGroups 中移动计算(我可以简单地通过使用非空代理时间和 maxBy 过滤组来完成 agtM 计算时间戳)。您能否建议实现上述目标的最佳方法?

编辑

agtM 计算已修复。为了给前面的操作提供更多的上下文,输出和另一个数据集(一个额外的字段,我们在输出中保持虚拟)的联合将需要按键分组以产生最终结果。我也在考虑在每个分区(mapPartitions)内计算这些值(除了窗口w),然后将每个分区内的列表作为另一个列表并进行进一步计算。

【问题讨论】:

您的w2 窗口规范似乎没有做任何与您列出的agtM 要求相关的事情,这应该用第一个非空agt 在降序中前向填充agtM时间戳顺序。但是,您预期的8 对应于"" 的输出似乎表明您实际上想用最后一个非空agt 回填? 这行| a| 1| | 1| 3| 8|不应该是| a| 1| 3| 1| 2| 4|吗? @LeoCyou 是对的。我想在遍历列表以获取按时间戳排序的 guid 时回填最后一个非空 agt。另一种计算方法是df.toDS().filter(_.agt != "").groupByKey(r => r.guid).mapGroups((a, b) => val agtMObject = b.maxBy(p => p.timestamp) TestWithagtM(agtMObject.guid, agtMObject.timestamp, agtMObject.agt, agtMObject.agtM) )@RameshMaharjan(a, 1, "") 是输入,最后一列应该是 8,因为如果你 groupBy a 然后按时间戳 desc 排序,那么 8 是对应的 agt 6将成为agtM 【参考方案1】:

要用最后一个非空的agt 值回填agtM,您可以将last("agt", ignoreNulls)rowsBetween() 一起用于w2

val ds = Seq(
  ("a", "1", ""),
  ("b", "2", "4"),
  ("a", "1", "3"),
  ("b", "2", "4"),
  ("c", "1", "3"),
  ("a", "6", "8"),
  ("b", "2", "4"),
  ("a", "1", "4")
).toDF("guid", "timestamp", "agt").
  as[(String, String, String)]

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val w = Window.partitionBy($"guid")
val w1 = Window.partitionBy($"guid", $"timestamp")
val w2 = Window.partitionBy($"guid").orderBy($"timestamp".desc).
  rowsBetween(Window.unboundedPreceding, 0)

ds.
  withColumn("minimum", min("timestamp").over(w)).
  withColumn("count", count("*").over(w1)).
  withColumn("agt", when($"agt" =!= "", $"agt")).
  withColumn("agtM", last("agt", ignoreNulls = true).over(w2)).
  na.fill("", Seq("agt")).
  dropDuplicates("guid", "timestamp").
  show
// +----+---------+---+-------+-----+----+
// |guid|timestamp|agt|minimum|count|agtM|
// +----+---------+---+-------+-----+----+
// |   c|        1|  3|      1|    1|   3|
// |   b|        2|  4|      2|    3|   4|
// |   a|        1|   |      1|    3|   8|
// |   a|        6|  8|      1|    1|   8|
// +----+---------+---+-------+-----+----+

鉴于您的每个窗口规格 ww1w2 都有自己的特定要求,我不确定可以做多少来减少洗牌。尽管您打算创建的结果数据集似乎很适合使用窗口函数,但您可以探索非窗口方法。

【讨论】:

这不适用于以下数据集。 val ds = Seq( ("a", "1", "8"), ("b", "2", "4"), ("a", "1", "3"), ("b", "2", "4"), ("c", "1", ""), ("a", "6", ""), ("b", "2", "4"), ("a", "1", "4") ).toDF("guid", "timestamp", "agt"). as[(String, String, String)] 输出:|guid|timestamp|agt|minimum|count|agtM| +----+---------+---+-------+-----+----+ | c| 1| | 1| 1|null| | b| 2| 4| 2| 3| 4| | a| 1| 8| 1| 3| 8| | a| 6| | 1| 1|null| 这就是结果应该是的,因为新样本数据中的空值位于分区内的第一行,并且回填不会(并且逻辑上不应该)跨越窗口规范中定义的分区边界。 在这种情况下,指定的 WindowSpec 是错误的(我不确定为什么 WindowsSpec 不能考虑整个边界,如果这不合逻辑,我可能不会称之为回填),因为它没有产生想要的结果。所需的是(请查看原帖中更新的 WnSpec)|guid|timestamp| agt|minimum|count|agtM| +----+---------+----+-------------+-----+--------+ | c| 1|null| 1| 1| | | b| 2| 4| 2| 3| 4| | a| 1| 8| 1| 3| 8| | a| 6|null| 1| 1| 8| 窗口函数在单独的窗口分区中运行。如果您的要求可以放宽为采用最接近的上一个或下一个非空值,则将行范围扩大到 (unboundedPreceding, unboundedFollowing) 将达到目的。在这种情况下,使用first 还是last 可能并不重要。【参考方案2】:

我需要计算 按 guid 分组时每行的最小时间戳。 按(guid、时间戳)分组时每个键的计数 按 guid 分组并按时间戳(desc)排序时的行 agtM 然后取第一个非空 agt else ""

查看您的要求,您需要计算 guid 组上的最小时间戳、agtM(最新)以及按 guid 和时间戳分组时的计数。这些要求表明您需要三个分组和三个改组

第一次分组和洗牌——求计数

val dfWithCount = df
      .groupBy("guid", "timestamp")
      .agg(count("guid").as("count"))

第二次和第三次分组洗牌

最新的 agt 即 agtM 可以使用 Window 函数找到,最小时间戳 可以使用另一个 groupByaggregation 找到

val dfWithMinAndMax = df.withColumn("agtM", first("agt").over(windowSpec))
      .groupBy("guid", "agtM")
      .agg(min("timestamp").as("minimum")
      )

最后你join 两个数据框

val finalDF = dfWithCount.join(dfWithMinAndMax, Seq("guid"))

这会给你正确的数据帧 但没有agt

+----+---------+-----+----+-------+
|guid|timestamp|count|agtM|minimum|
+----+---------+-----+----+-------+
|c   |1        |1    |3   |1      |
|b   |2        |3    |4   |2      |
|a   |1        |3    |8   |1      |
|a   |6        |1    |8   |1      |
+----+---------+-----+----+-------+

我猜agt 不是那么重要,但如果你真的需要它,那么你需要另一个 groupingshufflingjoin

val dfWithAgt = df.groupBy("guid", "timestamp").agg(min("agt").as("agt"))

finalDF.join(dfWithAgt, Seq("guid", "timestamp"))

这会给你

+----+---------+-----+----+-------+---+
|guid|timestamp|count|agtM|minimum|agt|
+----+---------+-----+----+-------+---+
|c   |1        |1    |3   |1      |3  |
|b   |2        |3    |4   |2      |4  |
|a   |1        |3    |8   |1      |   |
|a   |6        |1    |8   |1      |8  |
+----+---------+-----+----+-------+---+

可以使用select 完成列顺序。

希望回答对你有帮助

【讨论】:

您的解决方案给出了正确的结果。不幸的是,我在最终输出中也需要 agt(它将在后续操作中使用)。 我在最后一部分中也使用了 agt :) 你没看到 @subhodip 吗? 我确实看到了.. 我不确定,但你能告诉我这是否比原帖中的解决方案更有效吗?这看起来性能较差,但我可能是错的。我同意有不同的方法可以达到相同的效果,但是是否可以通过 2 次或更少的改组来获得相同的结果?如果我按 guid 进行分区,然后调用 mapPartitions 并将列表转换为另一个数据集(已经由 guid 映射,即使可能存在多个 guid),然后计算分区内的 agtM 和最小值怎么办? 是的@subhodip,你是绝对正确的。洗牌越多,解决方案的效率就越低。【参考方案3】:

最初通过 guid 对其进行分区,然后使用迭代器在逻辑上会减少洗牌。如果每个组内的数据很大,不确定效果。

df.toDS().groupByKey(_.guid).flatMapGroups((a,b) => 
          val list = b.toList
          val minimum = list.minBy(_.timestamp).timestamp
          val filteredList = list.filterNot(_.agt == "")
          val agtM = if(filteredList.isEmpty) "" else filteredList.maxBy(_.timestamp).agt
          list.groupBy(_.timestamp).map(r => (r._2.head.guid, r._1, r._2.head.agt, minimum, r._2.length, agtM))
        ).select($"_1".as("guid"), $"_2".as("timestamp"),
          $"_3".as("agt"), $"_4".as("minimum"), $"_5".as("count"), $"_6".as("agtM")).show()

【讨论】:

以上是关于按重叠列分区时的高效 Spark 数据集操作的主要内容,如果未能解决你的问题,请参考以下文章

Spark:保存按“虚拟”列分区的 DataFrame

Spark RDD理解

有没有办法在分区的 spark 数据集上并行运行操作?

SparkSQL:如何在从数据库加载数据集时指定分区列

Spark RDD 详细介绍

Spark 数据集写入 2 个不同的目录