如何在 partitionBy 输出之前平衡 Spark DataFrame 数据

Posted

技术标签:

【中文标题】如何在 partitionBy 输出之前平衡 Spark DataFrame 数据【英文标题】:How to balance Spark DataFrame data before partitionBy output 【发布时间】:2020-04-21 04:53:08 【问题描述】:

我有带有地理空间数据的 DataFrame。我想按列对它进行分区:数据源idquadkeys

主要目标是一方面使文件的最小数量不大于特定大小。

另一方面,我想优化我的内存分区(如果太大,我可能会遇到内存不足异常)。

换句话说,我有qk = 0,1,2,3,以及每个quadkey的下一个记录数:0 (1000), 1 (1000), 2 (10000), 3(100000). At output files I desire to have next files (not more than 5000 records on each): 0 (1000) 1 (1000) 2_1 (5000) 2_2 (5000) 3_1 (5000) ... 3_10 (5000)

作为一种解决方案,我构建了一个分布查找而不是盐。我想知道有什么更好的解决方案。

val MAX_PARTITION_ROWS = 3000000

val pid = "sourceid"
val qk = "qk"
val distributionDf: Array[((Int, String), Int)] = stage.
  select(col(pid), col(qk), lit(1L) as "cnt").
  groupBy(pid,qk).
  agg(sum("cnt") as "sum").
  rdd.map(r=>((r.getInt(0), r.getString(1))->r.getLong(2).toInt)).collect
LOG.info(s"QK distribution\nQK |PID |Count |Partitions number")
LOG.info(s"$distributionDf.sortBy(-_._2).map(x=>s"$x._1._2 |$x._1._1 | $x._2 | $x._2/MAX_PARTITION_ROWS").mkString("\n")")

val r = scala.util.Random
val distributionMap = distributionDf map  case (k, v) => k -> (v/MAX_PARTITION_ROWS+1) toMap
val saltUdf = udf((pid: Int, qk: String) => 
  val dev = distributionMap.getOrElse((pid, qk), 1)
  r.nextInt(dev)
)

val partitionsNumber = if (distributionMap.isEmpty) 1 else distributionMap.values.sum

stage.withColumn("salt", saltUdf(col(pid),col(qk)))
  .repartition(partitionsNumber, col(pid), col(qk), col("salt"))
  .drop("salt")
  .write.partitionBy(pid, qk)
  .format("parquet")
  .option("maxRecordsPerFile", (MAX_PARTITION_ROWS*1.2).toInt)
  .option("compression", "gzip")
  .save(destUrl)

【问题讨论】:

如果您提供示例数据框或数据回答者将给出您期望的正确响应。否则很难测试你的代码。 【参考方案1】:

您可以使用maxRecordsPerFile 记录的here

示例实现

val df = spark.range(100).coalesce(1)
df.write.option("maxRecordsPerFile", 50).save(filePath)

【讨论】:

我也使用该选项。以上更新。问题是,对于某些 qk,我可以获得超过 50 Gbs 的分区,而其他分区可能只有几兆字节。这会大大减慢刷新操作,有时会导致 OOM 错误,因为在重新分区阶段这会大于 2Gb。

以上是关于如何在 partitionBy 输出之前平衡 Spark DataFrame 数据的主要内容,如果未能解决你的问题,请参考以下文章

如何在窗口 scala/spark 中使用 partitionBy 函数

在 pyspark 数据框中使用 write.partitionBy 时如何删除重复项?

在 Pyspark 中,如何在 partitionBy 和 orderBy 之后进行分组?

SQL Server sp_help:如何限制输出窗口的数量

如何在 SDL Tridion 2011 SP1 中将一个 XSLT TBB 的输出传递到另一个 XSLT TBB

关于如何平衡不平衡的数据