Spark窗口分区功能需要永远完成

Posted

技术标签:

【中文标题】Spark窗口分区功能需要永远完成【英文标题】:Spark window partition function taking forever to complete 【发布时间】:2020-10-08 14:29:14 【问题描述】:

给定一个数据框,我试图计算过去 30 天内我看到 emailId 的次数。我的函数中的主要逻辑如下:

val new_df = df
  .withColumn("transaction_timestamp", unix_timestamp($"timestamp").cast(LongType))

val winSpec = Window
  .partitionBy("email")
  .orderBy(col("transaction_timestamp"))
  .rangeBetween(-NumberOfSecondsIn30Days, Window.currentRow)

val resultDF = new_df
  .filter(col("condition"))
  .withColumn("count", count(col("email")).over(winSpec))

配置:

spark.executor.cores=5

所以,我可以看到 5 个具有窗口功能的阶段,其中一些阶段完成得非常快(在几秒钟内),还有 2 个甚至在 3 小时内都没有完成,最后被卡住了少量任务(进展非常缓慢):

这对我来说是一个数据倾斜的问题,如果我从数据集中删除所有包含 5 个最高频率 email ids 的行,工作很快就会完成(不到 5 分钟)。

如果我尝试在 window partitionBy 中使用其他键,作业将在几分钟内完成:

 Window.partitionBy("email", "date")

但显然,如果我这样做,它会执行错误的计数计算,这不是一个可接受的解决方案。

我尝试了各种其他的 spark 设置,以增加内存、内核、并行度等,但似乎都没有帮助

Spark 版本:2.2

当前 Spark 配置:

-执行器-内存:100G

-executor-cores: 5

-驱动内存:80G

-spark.executor.memory=100g

使用每台具有 16 核、128 GB 内存的机器。最大节点数最多 500 个。

解决这个问题的正确方法是什么?

更新:只是为了提供更多上下文,这里是原始数据帧和相应的计算数据帧:

 val df = Seq(
      ("a@gmail.com", "2019-10-01 00:04:00"),
      ("a@gmail.com", "2019-11-02 01:04:00"), 
      ("a@gmail.com", "2019-11-22 02:04:00"),
      ("a@gmail.com", "2019-11-22 05:04:00"),
      ("a@gmail.com", "2019-12-02 03:04:00"),
      ("a@gmail.com", "2020-01-01 04:04:00"),
      ("a@gmail.com", "2020-03-11 05:04:00"),
      ("a@gmail.com", "2020-04-05 12:04:00"),
      ("b@gmail.com", "2020-05-03 03:04:00")  
    ).toDF("email", "transaction_timestamp")


val expectedDF = Seq(
      ("a@gmail.com", "2019-10-01 00:04:00", 1),
      ("a@gmail.com", "2019-11-02 01:04:00", 1), // prev one falls outside of last 30 days win
      ("a@gmail.com", "2019-11-22 02:04:00", 2),
      ("a@gmail.com", "2019-11-22 05:04:00", 3),
      ("a@gmail.com", "2019-12-02 03:04:00", 3),
      ("a@gmail.com", "2020-01-01 04:04:00", 1),
      ("a@gmail.com", "2020-03-11 05:04:00", 1),
      ("a@gmail.com", "2020-04-05 12:04:00", 2),
      ("b@gmail.com", "2020-05-03 03:04:00", 1) // new email
).toDF("email", "transaction_timestamp", count") 

【问题讨论】:

您能否提及您的机器规格。 ?总内核数和内存? 【参考方案1】:

您的某些分区可能太大,这是因为对于某些电子邮件,一个月内的数据太多。

要解决此问题,您可以创建一个仅包含电子邮件和时间戳的新数据框。然后,您按电子邮件和时间戳分组,计算行数并在希望少得多的数据上计算窗口。如果时间戳倾向于重复,即df.count 远大于df.select("email", "timestamp").distinct.count,则计算将加快。如果不是这种情况,您可以以损失一些精度为代价截断时间戳。这样,您无需计算过去 30 天内发生的次数(给定或花费一秒,因为时间戳以秒为单位),您将计算给定或花费一分钟或一小时甚至一天的发生次数,具体取决于您的需要。你会失去一点精度,但会大大加快计算速度。而且您提供的精度越高,您获得的速度就越快。

代码如下所示:

// 3600 means hourly precision.
// Set to 60 for minute precision, 1 for second precision, 24*3600 for one day.
// Note that even precisionLoss = 1 might make you gain speed depending on
// the distribution of your data
val precisionLoss = 3600 
val win_size = NumberOfSecondsIn30Days / precisionLoss

val winSpec = Window
  .partitionBy("email")
  .orderBy("truncated_timestamp")
  .rangeBetween(-win_size, Window.currentRow)

val new_df = df.withColumn("truncated_timestamp",
                      unix_timestamp($"timestamp") / 3600 cast "long")

val counts = new_df
  .groupBy("email", "truncated_timestamp")
  .count
  .withColumn("count", sum('count) over winSpec)

val result = new_df
  .join(counts, Seq("email", "truncated_timestamp"))

【讨论】:

非常感谢@Oli,这个解决方案很棒,但我仍然想知道为什么当我们失去一些精度时它运行得更快。是因为它能够进一步分区数据吗?我发现完全没有关于窗口函数如何在幕后工作的文档。 查看物理查询计划,与没有精度损失时相比,它看起来需要回溯的行数要少得多。这对我来说很有意义。 windows 在后台工作的方式非常简单。当您通过电子邮件进行分区时,数据会通过电子邮件重新分区,因此会有一个昂贵的洗牌。然后,每封电子邮件都关联一个任务,由一台机器上的一个核心处理。任务执行窗口计算。因此,如果一封电子邮件的数据比其他电子邮件多得多,则相应的任务将永远持续下去,整个工作也会如此,只是等待一位执行者完成他的工作。 这就是为什么我的第一个想法是减小窗口的大小,但结果令人失望。在这里,窗口计算更快有两个原因。数据框的列更少,因此记录更小(我们只保留两列,也许你有很多列,这使得计算更慢)。但主要是,我们通过聚合首先靠近的时间戳来减小窗口的大小。我们减小了窗口大小和倾斜度。 如果您还有更多类似的问题,请随时联系我。我真的很喜欢做这个,尽管我对第一个未能提高性能感到非常失望;-)【参考方案2】:

你是对的,这是一个数据倾斜问题,减小窗口大小会很有帮助。要仅获取有关过去 30 天的信息,您无需一直到时间开始。再一次,如果建立一个带有时间索引的窗口,在每个窗口开始时计算将是错误的,因为它无法访问前一个窗口。

我的建议是构建一个每30天递增一次的索引和两个大小为60天的重叠窗口,如下图所示:

要了解其工作原理,让我们考虑如图所示的带有index=2 的数据点。如果您有一个大小为 30 天的窗口,则它需要访问其窗口内和前一个窗口内的数据。那是不可能的。这就是我们构建更大的窗口以便我们可以访问所有数据的原因。如果我们考虑win1,我们会遇到与大小为 30 天的索引相同的问题。但是,如果我们考虑win2,则所有数据都在索引1的窗口中可用。

对于索引为 3 的点,我们将使用 win1。对于索引为 4 的点,win2 等。基本上,对于偶数索引,我们使用 win2。对于奇数索引,我们使用win1。这种方法将大大减少最大分区大小,从而减少单个任务中处理的最大数据量。

代码只是上面解释的翻译:

val winSize = NumberOfSecondsIn30Days

val win1 = Window
    .partitionBy("email", "index1")
    .orderBy(col("transaction_timestamp"))
    .rangeBetween(-winSize, Window.currentRow)
val win2 = Window
    .partitionBy("email", "index2")
    .orderBy(col("transaction_timestamp"))
    .rangeBetween(-winSize, Window.currentRow)

val indexed_df = new_df
    // the group by is only there in case there are duplicated timestamps,
    // so as to lighten the size of the windows
    .groupBy("email", "transaction_timestamp")
    .count()
    .withColumn("index",
        'transaction_timestamp / winSize cast "long")
    .withColumn("index1",
        ('transaction_timestamp / (winSize * 2)) cast "long")
    .withColumn("index2",
        (('transaction_timestamp + winSize) / (winSize * 2)) cast "long")

val result = indexed_df
    .withColumn("count", when(('index mod 2) === 0, sum('count) over win2)
                                      .otherwise(sum('count) over win1))

【讨论】:

感谢@Oli,我不确定我是否遵循 winSize 逻辑对时间戳的划分。请查看我添加到问题陈述中的示例数据框。你的代码还能用吗? 我意识到我忘记了窗口声明。我编辑了我的答案。它产生与您的代码相同的结果。 顺便说一句,您的示例中有错误。对于这个时间戳"2020-01-01 04:04:00",您期望2。但之前的时间戳是"2019-12-02 03:04:00",即 30 天零 1 小时前。因此它在窗口之外,正确的结果是1 关于第1个关注点,'index表示名为索引的列,如col("index")$"index"。关于第 2 个问题,做与其他问题相同的事情,如果我理解正确,请将 count('*) 替换为 count(when(condition, '*)) 你能提供indexed_df.join( df.groupBy("email").count.orderBy('count desc).limit(1).select("email"), Seq("email")).groupBy("index").count.orderBy('count desc).show的结果吗?目标是确定具有最大数据量的电子邮件的月份分布。【参考方案3】:

我们仍然可以为此避免使用 Window

对于上面提到的df

val df2 = df.withColumn("timestamp", unix_timestamp($"transaction_timestamp").cast(LongType))

val df3 = df2.withColumnRenamed("timestamp","timestamp_2").drop("transaction_timestamp")

val finalCountDf = df2.join(df3,Seq("email"))
.withColumn("is_within_30", when( $"timestamp" - $"timestamp_2" < NumberOfSecondsIn30Days && $"timestamp" - $"timestamp_2" > 0 , 1).otherwise(0))
.groupBy("email","transaction_timestamp").agg(sum("is_within_30") as "count")
.withColumn("count",$"count"+1)

finalCountDf.orderBy("transaction_timestamp").show
/*
+-----------+---------------------+-----+
|      email|transaction_timestamp|count|
+-----------+---------------------+-----+
|a@gmail.com|  2019-10-01 00:04:00|    1|
|a@gmail.com|  2019-11-02 01:04:00|    1|
|a@gmail.com|  2019-11-22 02:04:00|    2|
|a@gmail.com|  2019-11-22 05:04:00|    3|
|a@gmail.com|  2019-12-02 03:04:00|    3|
|a@gmail.com|  2020-01-01 04:04:00|    1|
|a@gmail.com|  2020-03-11 05:04:00|    1|
|a@gmail.com|  2020-04-05 12:04:00|    2|
|b@gmail.com|  2020-05-03 03:04:00|    1|
+-----------+---------------------+-----+
*/

解释:

根据“电子邮件”制作时间戳对(通过电子邮件加入) 比较每一对并检查它是否在过去 30 天内:如果是,则将其标记为 1,否则标记为 0 总结WRT“email”和“transaction_timestamp”的计数

假设:(电子邮件,transaction_timestamp)是不同的。如果没有,我们可以通过添加一个 monotonicallyIncreasingId 来处理

【讨论】:

谢谢@Sanket9394,问题是我需要为每一行计算这个计数。如果不使用像您这样的窗口功能,我不确定这是否可行。请查看我添加到问题陈述中的数据框。 此外,您正在尝试查找当前(当前)时间最近 30 天内的哪些行,但它应该来自事务时间戳。 @ic10503 我已经编辑了我的答案。你现在可以检查。这将确保您的所有 5 个核心都获得相同数量的负载。 另外,如果您有更多内核,请使用它们。 Increasing the no of cores 将有助于减少此方法的运行时间,但将成为 window 倾斜数据的瓶颈 @Sanket9394,我相信您的方法可以按预期工作。然而,我担心电子邮件分布不均,这并不能解决问题。它甚至可能使情况恶化。实际上,如果我们将 M 称为一封电子邮件的最大记录数,则问题的方法会生成大小为 M 的最大分区,使用您的方法,它将是 M^2。如果你用过滤器替换 when ,它可能会下降到 M^2/2 但我仍然不确定这是解决偏差的方法。

以上是关于Spark窗口分区功能需要永远完成的主要内容,如果未能解决你的问题,请参考以下文章

为啥重新部署应用后 Firebase 需要永远完成相同的功能?

Spark SQL表分区找不到文件

Spark,Hive SQL - 实现窗口功能?

Spark 和 SparkSQL:如何模仿窗口功能?

Spark Hive - 具有窗口功能的 UDFArgumentTypeException?

Spark SQL/Hive 查询永远需要加入