按重叠列分区时的高效 Spark 数据集操作
Posted
技术标签:
【中文标题】按重叠列分区时的高效 Spark 数据集操作【英文标题】:Efficient spark dataset operations when partitioned by overlapping columns 【发布时间】:2018-03-08 20:40:55 【问题描述】:我有一个如下所示的数据集(“guid”、“timestamp”、“agt”)
val df = List(Test("a", "1", null),
Test("b", "2", "4"),
Test("a", "1", "3"),
Test("b", "2", "4"),
Test("c", "1", "3"),
Test("a", "6", "8"),
Test("b", "2", "4"),
Test("a", "1", "4")
我需要计算
按 guid 分组时每行的最小时间戳。 按(guid、时间戳)分组时每个键的计数 行的 agtM 按 guid 分组并按时间戳 (desc) 排序然后取第一个非空 agt else "" 删除重复项所以输出将如下所示。
+----+---------+---+-------+-----+----+
|guid|timestamp|agt|minimum|count|agtM|
+----+---------+---+-------+-----+----+
| c| 1| 3| 1| 1| 3|
| b| 2| 4| 2| 3| 4|
| a| 1| | 1| 3| 8|
| a| 6| 8| 1| 1| 8|
+----+---------+---+-------+-----+----+
我试过了
val w = Window.partitionBy($"guid")
val w1 = Window.partitionBy($"guid", $"timestamp")
val w2 = Window.partitionBy($"guid").orderBy($"timestamp".desc).rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
val gg = df.toDS()
.withColumn("minimum", min("timestamp").over(w))
.withColumn("count", count("*").over(w1))
.withColumn("agtM", coalesce(first($"agt", true).over(w2), lit("")))
.dropDuplicates("guid", "timestamp")
不过,我对 agtM 计算不是很自信。我的目标是实现最小的改组,因为在这种情况下,我们首先按 guid 分组,然后按(guid,时间戳)分组,从逻辑上讲,第二个分组应该发生在第一个创建的分区中。然后输出按 guid 分组并与另一个表连接。这两个数据都非常庞大(以 TB 为单位),因此希望通过最少的改组来实现这一点,并且不想稍后在 mapGroups 中移动计算(我可以简单地通过使用非空代理时间和 maxBy 过滤组来完成 agtM 计算时间戳)。您能否建议实现上述目标的最佳方法?
编辑
agtM 计算已修复。为了给前面的操作提供更多的上下文,输出和另一个数据集(一个额外的字段,我们在输出中保持虚拟)的联合将需要按键分组以产生最终结果。我也在考虑在每个分区(mapPartitions)内计算这些值(除了窗口w),然后将每个分区内的列表作为另一个列表并进行进一步计算。
【问题讨论】:
您的w2
窗口规范似乎没有做任何与您列出的agtM
要求相关的事情,这应该用第一个非空agt
在降序中前向填充agtM
时间戳顺序。但是,您预期的8
对应于""
的输出似乎表明您实际上想用最后一个非空agt
回填?
这行| a| 1| | 1| 3| 8|
不应该是| a| 1| 3| 1| 2| 4|
吗?
@LeoCyou 是对的。我想在遍历列表以获取按时间戳排序的 guid 时回填最后一个非空 agt。另一种计算方法是df.toDS().filter(_.agt != "").groupByKey(r => r.guid).mapGroups((a, b) => val agtMObject = b.maxBy(p => p.timestamp) TestWithagtM(agtMObject.guid, agtMObject.timestamp, agtMObject.agt, agtMObject.agtM) )
@RameshMaharjan(a, 1, "") 是输入,最后一列应该是 8,因为如果你 groupBy a 然后按时间戳 desc 排序,那么 8 是对应的 agt 6将成为agtM
【参考方案1】:
要用最后一个非空的agt
值回填agtM
,您可以将last("agt", ignoreNulls)
与rowsBetween()
一起用于w2
:
val ds = Seq(
("a", "1", ""),
("b", "2", "4"),
("a", "1", "3"),
("b", "2", "4"),
("c", "1", "3"),
("a", "6", "8"),
("b", "2", "4"),
("a", "1", "4")
).toDF("guid", "timestamp", "agt").
as[(String, String, String)]
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy($"guid")
val w1 = Window.partitionBy($"guid", $"timestamp")
val w2 = Window.partitionBy($"guid").orderBy($"timestamp".desc).
rowsBetween(Window.unboundedPreceding, 0)
ds.
withColumn("minimum", min("timestamp").over(w)).
withColumn("count", count("*").over(w1)).
withColumn("agt", when($"agt" =!= "", $"agt")).
withColumn("agtM", last("agt", ignoreNulls = true).over(w2)).
na.fill("", Seq("agt")).
dropDuplicates("guid", "timestamp").
show
// +----+---------+---+-------+-----+----+
// |guid|timestamp|agt|minimum|count|agtM|
// +----+---------+---+-------+-----+----+
// | c| 1| 3| 1| 1| 3|
// | b| 2| 4| 2| 3| 4|
// | a| 1| | 1| 3| 8|
// | a| 6| 8| 1| 1| 8|
// +----+---------+---+-------+-----+----+
鉴于您的每个窗口规格 w
、w1
和 w2
都有自己的特定要求,我不确定可以做多少来减少洗牌。尽管您打算创建的结果数据集似乎很适合使用窗口函数,但您可以探索非窗口方法。
【讨论】:
这不适用于以下数据集。val ds = Seq( ("a", "1", "8"), ("b", "2", "4"), ("a", "1", "3"), ("b", "2", "4"), ("c", "1", ""), ("a", "6", ""), ("b", "2", "4"), ("a", "1", "4") ).toDF("guid", "timestamp", "agt"). as[(String, String, String)]
输出:|guid|timestamp|agt|minimum|count|agtM| +----+---------+---+-------+-----+----+ | c| 1| | 1| 1|null| | b| 2| 4| 2| 3| 4| | a| 1| 8| 1| 3| 8| | a| 6| | 1| 1|null|
这就是结果应该是的,因为新样本数据中的空值位于分区内的第一行,并且回填不会(并且逻辑上不应该)跨越窗口规范中定义的分区边界。
在这种情况下,指定的 WindowSpec 是错误的(我不确定为什么 WindowsSpec 不能考虑整个边界,如果这不合逻辑,我可能不会称之为回填),因为它没有产生想要的结果。所需的是(请查看原帖中更新的 WnSpec)|guid|timestamp| agt|minimum|count|agtM| +----+---------+----+-------------+-----+--------+ | c| 1|null| 1| 1| | | b| 2| 4| 2| 3| 4| | a| 1| 8| 1| 3| 8| | a| 6|null| 1| 1| 8|
窗口函数在单独的窗口分区中运行。如果您的要求可以放宽为采用最接近的上一个或下一个非空值,则将行范围扩大到 (unboundedPreceding, unboundedFollowing) 将达到目的。在这种情况下,使用first
还是last
可能并不重要。【参考方案2】:
我需要计算 按 guid 分组时每行的最小时间戳。 按(guid、时间戳)分组时每个键的计数 按 guid 分组并按时间戳(desc)排序时的行 agtM 然后取第一个非空 agt else ""
查看您的要求,您需要计算 guid 组上的最小时间戳、agtM(最新)以及按 guid 和时间戳分组时的计数。这些要求表明您需要三个分组和三个改组。
第一次分组和洗牌——求计数
val dfWithCount = df
.groupBy("guid", "timestamp")
.agg(count("guid").as("count"))
第二次和第三次分组洗牌
最新的 agt 即 agtM 可以使用 Window
函数找到,最小时间戳 可以使用另一个 groupBy
和 aggregation
找到
val dfWithMinAndMax = df.withColumn("agtM", first("agt").over(windowSpec))
.groupBy("guid", "agtM")
.agg(min("timestamp").as("minimum")
)
最后你join
两个数据框
val finalDF = dfWithCount.join(dfWithMinAndMax, Seq("guid"))
这会给你正确的数据帧 但没有agt
+----+---------+-----+----+-------+
|guid|timestamp|count|agtM|minimum|
+----+---------+-----+----+-------+
|c |1 |1 |3 |1 |
|b |2 |3 |4 |2 |
|a |1 |3 |8 |1 |
|a |6 |1 |8 |1 |
+----+---------+-----+----+-------+
我猜agt
不是那么重要,但如果你真的需要它,那么你需要另一个 grouping 和 shuffling 和 join
val dfWithAgt = df.groupBy("guid", "timestamp").agg(min("agt").as("agt"))
finalDF.join(dfWithAgt, Seq("guid", "timestamp"))
这会给你
+----+---------+-----+----+-------+---+
|guid|timestamp|count|agtM|minimum|agt|
+----+---------+-----+----+-------+---+
|c |1 |1 |3 |1 |3 |
|b |2 |3 |4 |2 |4 |
|a |1 |3 |8 |1 | |
|a |6 |1 |8 |1 |8 |
+----+---------+-----+----+-------+---+
可以使用select
完成列顺序。
希望回答对你有帮助
【讨论】:
您的解决方案给出了正确的结果。不幸的是,我在最终输出中也需要 agt(它将在后续操作中使用)。 我在最后一部分中也使用了 agt :) 你没看到 @subhodip 吗? 我确实看到了.. 我不确定,但你能告诉我这是否比原帖中的解决方案更有效吗?这看起来性能较差,但我可能是错的。我同意有不同的方法可以达到相同的效果,但是是否可以通过 2 次或更少的改组来获得相同的结果?如果我按 guid 进行分区,然后调用 mapPartitions 并将列表转换为另一个数据集(已经由 guid 映射,即使可能存在多个 guid),然后计算分区内的 agtM 和最小值怎么办? 是的@subhodip,你是绝对正确的。洗牌越多,解决方案的效率就越低。【参考方案3】:最初通过 guid 对其进行分区,然后使用迭代器在逻辑上会减少洗牌。如果每个组内的数据很大,不确定效果。
df.toDS().groupByKey(_.guid).flatMapGroups((a,b) =>
val list = b.toList
val minimum = list.minBy(_.timestamp).timestamp
val filteredList = list.filterNot(_.agt == "")
val agtM = if(filteredList.isEmpty) "" else filteredList.maxBy(_.timestamp).agt
list.groupBy(_.timestamp).map(r => (r._2.head.guid, r._1, r._2.head.agt, minimum, r._2.length, agtM))
).select($"_1".as("guid"), $"_2".as("timestamp"),
$"_3".as("agt"), $"_4".as("minimum"), $"_5".as("count"), $"_6".as("agtM")).show()
【讨论】:
以上是关于按重叠列分区时的高效 Spark 数据集操作的主要内容,如果未能解决你的问题,请参考以下文章