Spark窗口分区功能需要永远完成
Posted
技术标签:
【中文标题】Spark窗口分区功能需要永远完成【英文标题】:Spark window partition function taking forever to complete 【发布时间】:2020-10-08 14:29:14 【问题描述】:给定一个数据框,我试图计算过去 30 天内我看到 emailId 的次数。我的函数中的主要逻辑如下:
val new_df = df
.withColumn("transaction_timestamp", unix_timestamp($"timestamp").cast(LongType))
val winSpec = Window
.partitionBy("email")
.orderBy(col("transaction_timestamp"))
.rangeBetween(-NumberOfSecondsIn30Days, Window.currentRow)
val resultDF = new_df
.filter(col("condition"))
.withColumn("count", count(col("email")).over(winSpec))
配置:
spark.executor.cores=5
所以,我可以看到 5 个具有窗口功能的阶段,其中一些阶段完成得非常快(在几秒钟内),还有 2 个甚至在 3 小时内都没有完成,最后被卡住了少量任务(进展非常缓慢):
这对我来说是一个数据倾斜的问题,如果我从数据集中删除所有包含 5 个最高频率 email
ids 的行,工作很快就会完成(不到 5 分钟)。
如果我尝试在 window partitionBy 中使用其他键,作业将在几分钟内完成:
Window.partitionBy("email", "date")
但显然,如果我这样做,它会执行错误的计数计算,这不是一个可接受的解决方案。
我尝试了各种其他的 spark 设置,以增加内存、内核、并行度等,但似乎都没有帮助。
Spark 版本:2.2
当前 Spark 配置:
-执行器-内存:100G
-executor-cores: 5
-驱动内存:80G
-spark.executor.memory=100g
使用每台具有 16 核、128 GB 内存的机器。最大节点数最多 500 个。
解决这个问题的正确方法是什么?
更新:只是为了提供更多上下文,这里是原始数据帧和相应的计算数据帧:
val df = Seq(
("a@gmail.com", "2019-10-01 00:04:00"),
("a@gmail.com", "2019-11-02 01:04:00"),
("a@gmail.com", "2019-11-22 02:04:00"),
("a@gmail.com", "2019-11-22 05:04:00"),
("a@gmail.com", "2019-12-02 03:04:00"),
("a@gmail.com", "2020-01-01 04:04:00"),
("a@gmail.com", "2020-03-11 05:04:00"),
("a@gmail.com", "2020-04-05 12:04:00"),
("b@gmail.com", "2020-05-03 03:04:00")
).toDF("email", "transaction_timestamp")
val expectedDF = Seq(
("a@gmail.com", "2019-10-01 00:04:00", 1),
("a@gmail.com", "2019-11-02 01:04:00", 1), // prev one falls outside of last 30 days win
("a@gmail.com", "2019-11-22 02:04:00", 2),
("a@gmail.com", "2019-11-22 05:04:00", 3),
("a@gmail.com", "2019-12-02 03:04:00", 3),
("a@gmail.com", "2020-01-01 04:04:00", 1),
("a@gmail.com", "2020-03-11 05:04:00", 1),
("a@gmail.com", "2020-04-05 12:04:00", 2),
("b@gmail.com", "2020-05-03 03:04:00", 1) // new email
).toDF("email", "transaction_timestamp", count")
【问题讨论】:
您能否提及您的机器规格。 ?总内核数和内存? 【参考方案1】:您的某些分区可能太大,这是因为对于某些电子邮件,一个月内的数据太多。
要解决此问题,您可以创建一个仅包含电子邮件和时间戳的新数据框。然后,您按电子邮件和时间戳分组,计算行数并在希望少得多的数据上计算窗口。如果时间戳倾向于重复,即df.count
远大于df.select("email", "timestamp").distinct.count
,则计算将加快。如果不是这种情况,您可以以损失一些精度为代价截断时间戳。这样,您无需计算过去 30 天内发生的次数(给定或花费一秒,因为时间戳以秒为单位),您将计算给定或花费一分钟或一小时甚至一天的发生次数,具体取决于您的需要。你会失去一点精度,但会大大加快计算速度。而且您提供的精度越高,您获得的速度就越快。
代码如下所示:
// 3600 means hourly precision.
// Set to 60 for minute precision, 1 for second precision, 24*3600 for one day.
// Note that even precisionLoss = 1 might make you gain speed depending on
// the distribution of your data
val precisionLoss = 3600
val win_size = NumberOfSecondsIn30Days / precisionLoss
val winSpec = Window
.partitionBy("email")
.orderBy("truncated_timestamp")
.rangeBetween(-win_size, Window.currentRow)
val new_df = df.withColumn("truncated_timestamp",
unix_timestamp($"timestamp") / 3600 cast "long")
val counts = new_df
.groupBy("email", "truncated_timestamp")
.count
.withColumn("count", sum('count) over winSpec)
val result = new_df
.join(counts, Seq("email", "truncated_timestamp"))
【讨论】:
非常感谢@Oli,这个解决方案很棒,但我仍然想知道为什么当我们失去一些精度时它运行得更快。是因为它能够进一步分区数据吗?我发现完全没有关于窗口函数如何在幕后工作的文档。 查看物理查询计划,与没有精度损失时相比,它看起来需要回溯的行数要少得多。这对我来说很有意义。 windows 在后台工作的方式非常简单。当您通过电子邮件进行分区时,数据会通过电子邮件重新分区,因此会有一个昂贵的洗牌。然后,每封电子邮件都关联一个任务,由一台机器上的一个核心处理。任务执行窗口计算。因此,如果一封电子邮件的数据比其他电子邮件多得多,则相应的任务将永远持续下去,整个工作也会如此,只是等待一位执行者完成他的工作。 这就是为什么我的第一个想法是减小窗口的大小,但结果令人失望。在这里,窗口计算更快有两个原因。数据框的列更少,因此记录更小(我们只保留两列,也许你有很多列,这使得计算更慢)。但主要是,我们通过聚合首先靠近的时间戳来减小窗口的大小。我们减小了窗口大小和倾斜度。 如果您还有更多类似的问题,请随时联系我。我真的很喜欢做这个,尽管我对第一个未能提高性能感到非常失望;-)【参考方案2】:你是对的,这是一个数据倾斜问题,减小窗口大小会很有帮助。要仅获取有关过去 30 天的信息,您无需一直到时间开始。再一次,如果建立一个带有时间索引的窗口,在每个窗口开始时计算将是错误的,因为它无法访问前一个窗口。
我的建议是构建一个每30天递增一次的索引和两个大小为60天的重叠窗口,如下图所示:
要了解其工作原理,让我们考虑如图所示的带有index=2
的数据点。如果您有一个大小为 30 天的窗口,则它需要访问其窗口内和前一个窗口内的数据。那是不可能的。这就是我们构建更大的窗口以便我们可以访问所有数据的原因。如果我们考虑win1
,我们会遇到与大小为 30 天的索引相同的问题。但是,如果我们考虑win2
,则所有数据都在索引1的窗口中可用。
对于索引为 3 的点,我们将使用 win1
。对于索引为 4 的点,win2
等。基本上,对于偶数索引,我们使用 win2
。对于奇数索引,我们使用win1
。这种方法将大大减少最大分区大小,从而减少单个任务中处理的最大数据量。
代码只是上面解释的翻译:
val winSize = NumberOfSecondsIn30Days
val win1 = Window
.partitionBy("email", "index1")
.orderBy(col("transaction_timestamp"))
.rangeBetween(-winSize, Window.currentRow)
val win2 = Window
.partitionBy("email", "index2")
.orderBy(col("transaction_timestamp"))
.rangeBetween(-winSize, Window.currentRow)
val indexed_df = new_df
// the group by is only there in case there are duplicated timestamps,
// so as to lighten the size of the windows
.groupBy("email", "transaction_timestamp")
.count()
.withColumn("index",
'transaction_timestamp / winSize cast "long")
.withColumn("index1",
('transaction_timestamp / (winSize * 2)) cast "long")
.withColumn("index2",
(('transaction_timestamp + winSize) / (winSize * 2)) cast "long")
val result = indexed_df
.withColumn("count", when(('index mod 2) === 0, sum('count) over win2)
.otherwise(sum('count) over win1))
【讨论】:
感谢@Oli,我不确定我是否遵循 winSize 逻辑对时间戳的划分。请查看我添加到问题陈述中的示例数据框。你的代码还能用吗? 我意识到我忘记了窗口声明。我编辑了我的答案。它产生与您的代码相同的结果。 顺便说一句,您的示例中有错误。对于这个时间戳"2020-01-01 04:04:00"
,您期望2
。但之前的时间戳是"2019-12-02 03:04:00"
,即 30 天零 1 小时前。因此它在窗口之外,正确的结果是1
。
关于第1个关注点,'index
表示名为索引的列,如col("index")
或$"index"
。关于第 2 个问题,做与其他问题相同的事情,如果我理解正确,请将 count('*)
替换为 count(when(condition, '*))
。
你能提供indexed_df.join( df.groupBy("email").count.orderBy('count desc).limit(1).select("email"), Seq("email")).groupBy("index").count.orderBy('count desc).show
的结果吗?目标是确定具有最大数据量的电子邮件的月份分布。【参考方案3】:
我们仍然可以为此避免使用 Window
对于上面提到的df
val df2 = df.withColumn("timestamp", unix_timestamp($"transaction_timestamp").cast(LongType))
val df3 = df2.withColumnRenamed("timestamp","timestamp_2").drop("transaction_timestamp")
val finalCountDf = df2.join(df3,Seq("email"))
.withColumn("is_within_30", when( $"timestamp" - $"timestamp_2" < NumberOfSecondsIn30Days && $"timestamp" - $"timestamp_2" > 0 , 1).otherwise(0))
.groupBy("email","transaction_timestamp").agg(sum("is_within_30") as "count")
.withColumn("count",$"count"+1)
finalCountDf.orderBy("transaction_timestamp").show
/*
+-----------+---------------------+-----+
| email|transaction_timestamp|count|
+-----------+---------------------+-----+
|a@gmail.com| 2019-10-01 00:04:00| 1|
|a@gmail.com| 2019-11-02 01:04:00| 1|
|a@gmail.com| 2019-11-22 02:04:00| 2|
|a@gmail.com| 2019-11-22 05:04:00| 3|
|a@gmail.com| 2019-12-02 03:04:00| 3|
|a@gmail.com| 2020-01-01 04:04:00| 1|
|a@gmail.com| 2020-03-11 05:04:00| 1|
|a@gmail.com| 2020-04-05 12:04:00| 2|
|b@gmail.com| 2020-05-03 03:04:00| 1|
+-----------+---------------------+-----+
*/
解释:
根据“电子邮件”制作时间戳对(通过电子邮件加入) 比较每一对并检查它是否在过去 30 天内:如果是,则将其标记为 1,否则标记为 0 总结WRT“email”和“transaction_timestamp”的计数假设:(电子邮件,transaction_timestamp)是不同的。如果没有,我们可以通过添加一个 monotonicallyIncreasingId 来处理
【讨论】:
谢谢@Sanket9394,问题是我需要为每一行计算这个计数。如果不使用像您这样的窗口功能,我不确定这是否可行。请查看我添加到问题陈述中的数据框。 此外,您正在尝试查找当前(当前)时间最近 30 天内的哪些行,但它应该来自事务时间戳。 @ic10503 我已经编辑了我的答案。你现在可以检查。这将确保您的所有 5 个核心都获得相同数量的负载。 另外,如果您有更多内核,请使用它们。Increasing the no of cores
将有助于减少此方法的运行时间,但将成为 window
倾斜数据的瓶颈
@Sanket9394,我相信您的方法可以按预期工作。然而,我担心电子邮件分布不均,这并不能解决问题。它甚至可能使情况恶化。实际上,如果我们将 M 称为一封电子邮件的最大记录数,则问题的方法会生成大小为 M 的最大分区,使用您的方法,它将是 M^2。如果你用过滤器替换 when ,它可能会下降到 M^2/2 但我仍然不确定这是解决偏差的方法。以上是关于Spark窗口分区功能需要永远完成的主要内容,如果未能解决你的问题,请参考以下文章
为啥重新部署应用后 Firebase 需要永远完成相同的功能?