Spark缓慢重新分区许多小文件

Posted

技术标签:

【中文标题】Spark缓慢重新分区许多小文件【英文标题】:Spark slow repartitioning many small files 【发布时间】:2018-10-02 10:45:17 【问题描述】:

我正在尝试读取一个包含许多小拼花文件的文件夹:600 个文件,每个 500KB。然后repartition他们分成2个文件。

val df = spark.read.parquet("folder")
df.repartition(2).write.mode("overwrite").parquet("output_folder")

这非常慢,最多 10 分钟。从 spark UI 我可以看到 2 个执行程序正在处理 2 个任务。我给每个执行者 10GB 内存。

那么速度慢的原因是什么?是因为磁盘IO吗?在这种情况下如何提高性能。

编辑:我也尝试过使用coalesce,性能看起来并没有什么不同。

【问题讨论】:

请查看下方希望对您有所帮助! 每个执行器(spark.executor.cores)有多少个核心?如果只有一个,这可能是慢线的原因。 @pasha701 每个有 5 个,但我认为这并不重要,因为只有 2 个任务,所以只会使用 2 个内核。 600 个文件的输入,在阅读过程中可能会很慢,这就是内核很重要的原因。只是“df.count”工作得很快? @pasha701 df.count 很快。除了执行程序核心之外,我还可以尝试哪些其他设置?我认为 5 通常是该设置的最佳值。 【参考方案1】:

第一个选项是在源代码级别用小 parquet 文件制作一个大文件,是将它们合并为多个文件 > 128 mb 大小的文件或您想要的任何大小

how to merge multiple parquet files to single parquet file using linux or hdfs command?

第二个选项,即使用 spark :读取小型 parquet 文件,然后在使用 spark 实际数据业务处理逻辑之前将它们写入您所期望的相对较大的文件中(通过考虑性能因素考虑)


第二个选项

尽管我不知道您的 spark 作业配置是什么...但总的来说 coalesce 应该可以工作...尝试像下面的示例(主 -> 本地但将其更改为适用于您的应用程序的纱线)为了我。 在此示例中,我在 src/main/resources 下获取了小文件“./userdata*.parquet”(5 个小文件,大约 110 KB) 并用coalesce合并到最后2个文件中...

方法: 将每个 parquet 文件读取为一个数据帧,然后合并以生成单个数据帧,然后 coalesce 它。

  package com.examples

import org.apache.hadoop.conf._
import org.apache.hadoop.fs._
import org.apache.log4j.Level, Logger
import org.apache.spark.internal.Logging
import org.apache.spark.sql.DataFrame, SaveMode, SparkSession

import scala.collection.mutable

/** *
  * take small pegs and make a large peg
  * and coalesce it
  *
  * @author : Ram Ghadiyaram
  */
object ParquetPlay extends Logging 
  Logger.getLogger("org").setLevel(Level.OFF)


  //public FileStatus[] globStatus(Path pathPattern) throws IOException
  def main(args: Array[String]): Unit = 


 val appName = if (args.length >0) args(0) else this.getClass.getName
    val spark: SparkSession = SparkSession.builder
      .config("spark.master", "local")
      .appName(appName)
      .getOrCreate()
    val fs = FileSystem.get(new Configuration())

    val files = fs.globStatus(new Path("./userdata*.parquet")).map(_.getPath.toString)
    val dfSeq = mutable.MutableList[DataFrame]()
    println(dfSeq)
    println(files.length)
    files.foreach(x => println(x))
    val newDFs = files.map(dir => 
      dfSeq += spark.read.parquet(dir).toDF()
    )
    println(dfSeq.length)
    val finalDF = dfSeq.reduce(_ union _)
      .toDF
    finalDF.show(false)
    println(System.getProperty("java.io.tmpdir"))
    println(System.getProperties.toString)
    finalDF.coalesce(2)
      .write
      .mode(SaveMode.Overwrite)
      .parquet(s"$System.getProperty("java.io.tmpdir")/final.parquet")
    println("done")
  

结果: 大小几乎相等的 2 个文件,如下所示...在示例中再次生成小文件,但在您的情况下,因为您有 500KB 大小和大约 600 个文件,您可以看到文件的大小和 你可以决定coalesce(你期望的分区数)

第三个选项:正如评论中提到的Minh(原始发帖人)...可能有大文件被高度压缩,压缩后变小可能会导致这个。

【讨论】:

当然,它适用于您的代码和我的代码。问题是它。这就是我要解决的问题。 AFAIK 除了上述2之外,我没有其他方法我知道。您是否也尝试过上述方法?你在 spark-submit 中的配置是什么? 我给每个执行器 5 个核心和 10GB 内存。执行器的数量似乎并不相关,因为在 Spark UI 中,只有 2 个任务分配给 2 个执行器。 是的,执行者的数量并不是那么小。因为这里没有大数据。我看到每个任务都在 45 秒内运行。你说这很慢吗?你的期望是什么?也可能存在调度/分配延迟。这不在您的工作范围内。不是吗? 好的!除了程序之外,可能还有另一个原因。我觉得获得 sc 或调度延迟有些延迟......可能是......【参考方案2】:

这是 Spark 当前的一个折衷方案(3.0 版应该会解决这个问题),因为任务数量应该与文件数量的 1x1 映射有关……所以任务数量越多,性能越好但从分区的角度来看确实不理想,因为在这种情况下文件可能非常小。

另一个问题是,在大多数情况下,最终重新分区的数据集的容量会增加,因为压缩算法不再具有有关密钥的信息。对于现实生活中的大数据来说,这是一个主要问题,因为磁盘空间占用率将大幅增长。对于非常嵌套的数据集尤其如此。

解决方案是将数据集扁平化为简单模式,以便我们每次写入磁盘时都可以利用压缩算法。

希望有帮助!

【讨论】:

以上是关于Spark缓慢重新分区许多小文件的主要内容,如果未能解决你的问题,请参考以下文章

重新分区分区数据

为啥在 Spark 中重新分区比 partitionBy 快?

Spark中的最佳重新分区方式

Spark:读取文本文件后的重新分区策略

在 Spark 中处理压缩文件:重新分区可以提高还是降低性能

Spark重新分区不均匀分布记录