使用 Dataset.groupByKey 时如何绕过 2GB 缓冲区限制?

Posted

技术标签:

【中文标题】使用 Dataset.groupByKey 时如何绕过 2GB 缓冲区限制?【英文标题】:How can you get around the 2GB buffer limit when using Dataset.groupByKey? 【发布时间】:2018-10-08 00:38:08 【问题描述】:

在 Spark 中使用 Dataset.groupByKey(_.key).mapGroupsDataset.groupByKey(_.key).cogroup 时,当其中一个分组导致超过 2GB 的数据时,我遇到了问题。

在开始减少数据之前,我需要按组对数据进行规范化,并且我想将这些组分成更小的子组,以便它们更好地分布。例如,这是我尝试拆分组的一种方法:

val groupedInputs = inputData.groupByKey(_.key).mapGroups 
    case(key, inputSeries) => inputSeries.grouped(maxGroupSize).map(group => (key, group))

但不幸的是,尽管我尝试解决它,但我的工作总是因以下错误而死:java.lang.UnsupportedOperationException: Cannot grow BufferHolder by size 23816 because the size after growing exceeds size limitation 2147483632。使用 Kryo 序列化时,我收到不同的 Kryo serialization failed: Buffer overflow 错误,建议我增加 spark.kryoserializer.buffer.max,但我已经将其增加到 2GB 限制。

我想到的一个解决方案是在对键进行分组之前向它们添加一个随机值。这并不理想,因为它会分裂每个群体(不仅仅是大群体),但我愿意为了“工作”而牺牲“理想”。该代码看起来像这样:

val splitInputs = inputData.map( record => (record, ThreadLocalRandom.current.nextInt(splitFactor)))
val groupedInputs = splitInputs.groupByKey case(record, split) => (record.key, split)).mapGroups 
    case((key, _), inputSeries) => inputSeries.grouped(maxGroupSize).map(group => (key, group.map(_._1)))

【问题讨论】:

【参考方案1】:

添加一个盐键并在您的键和盐键上执行 groupBy 以及稍后

import scala.util.Random
    val start = 1
      val end   = 5
      val randUdf = udf(() => start + Random.nextInt((end - start) + 1))

      val saltGroupBy=skewDF.withColumn("salt_key", randUdf())
        .groupBy(col("name"), col("salt_key"))

因此,您所有的倾斜数据都不会进入一个执行程序并导致 2GB 限制。

但是你必须开发一个逻辑来聚合上面的结果,最后在最后移除盐键。

当您使用 groupBy 时,具有相同键的所有记录将到达一个执行器并出现瓶颈。 以上是缓解的方法之一。

【讨论】:

谢谢,这与我正在做的类似,只是您使用的是 Dataframe API。如果我可以提出一些建议:而不是在这里使用 UDF,您可以使用内置的 rand 函数,如下所示:skewDF.withColumn("salt_key", (rand * 5).cast(IntegerType)) 这应该比 UDF 版本的性能更好。【参考方案2】:

在这种情况下,数据集有很多偏差,将记录分组到常规大小的组很重要,我决定分两次处理数据集。首先,我使用窗口函数按键对行进行编号,然后根据可配置的“maxGroupSize”将其转换为“组索引”:

// The "orderBy" doesn't seem necessary here, 
// but the row_number function requires it.
val partitionByKey = Window.partitionBy(key).orderBy(key)

val indexedData = inputData.withColumn("groupIndex", 
  (row_number.over(partitionByKey) / maxGroupSize).cast(IntegerType))
  .as[(Record, Int)]

然后我可以按键和索引进行分组,并生成大小一致的组——记录多的键被拆分得更多,而记录少的键可能根本不拆分。

indexedData.groupByKey case (record, groupIndex) => (record.key, groupIndex) 
  .mapGroups case((key, _), recordGroup) =>
      // Remove the index values before returning the groups
      (key, recordGroup.map(_._1))
  

【讨论】:

当您确定分组会产生超过 2GB 的数据时,请避免使用 groupByKey()。对于较大的数据集,首选 reduceByKey()。欲了解更多信息,请参阅:databricks.gitbooks.io/databricks-spark-knowledge-base/content/… @Prazy 这通常是个好建议——避免 groupByKey 并改用 reduceByKey——但在这种特殊情况下并不实用,因为我正在执行“map”操作而不是“reduce”。这就是为什么我会费力地将密钥分成更小的组。

以上是关于使用 Dataset.groupByKey 时如何绕过 2GB 缓冲区限制?的主要内容,如果未能解决你的问题,请参考以下文章

使用 SpriteKit 时如何暂停声音

如何在使用 observable 更新数组时更新 *ngFor 以及如何在网站加载时初始化 observable?

按下标签栏项目时如何显示警报视图以及在其他目标中使用时如何忽略它?

在使用 jupyter notebook 时如何在 pandas 中使用 Dataframe 时查看完整数据? [复制]

使用`FALLBACK`时如何知道是不是返回左值?

使用google可视化api时,使用dashboard时如何使用getChartLayoutInterface()