Spark parquet 数据帧分区数

Posted

技术标签:

【中文标题】Spark parquet 数据帧分区数【英文标题】:Spark parquet data frame number of partitions 【发布时间】:2017-06-28 20:40:23 【问题描述】:

我有一个 HDFS 文件夹,里面有两个 250MB 的 parquet 文件。 hadoop df 块大小设置为 128MB。 有以下代码:

    JavaSparkContext sparkContext = new JavaSparkContext();

    SQLContext sqlContext = new SQLContext(sparkContext);
    DataFrame dataFrame = sqlContext.read().parquet("hdfs:////user/test/parquet-folder");
    LOGGER.info("Nr. of rdd partitions: ", dataFrame.rdd().getNumPartitions());

    sparkContext.close();

我在 spark.executor.instances=3 和 spark.executor.cores=4 的集群上运行它。我可以看到 parquet 文件的读取分为 3 个执行器 X 4 个核心 = 12 个任务:

   spark.SparkContext: Starting job: parquet at VerySimpleJob.java:25
   scheduler.DAGScheduler: Got job 0 (parquet at VerySimpleJob.java:25) with 12 output partitions

但是,当我得到数据帧 RDD(或使用 toJavaRDD() 创建 RDD)调用时,我只得到 4 个分区。这是否由 hdfs 块大小控制 - 每个文件 2 个块,因此 4 个分区?

为什么这与 parquet(父?)操作的分区数不完全匹配?

【问题讨论】:

在下面回答,但总的来说你是对的——这都是关于 HDFS 块大小的。 根据@Zyoma 的建议,我更新了代码,试图强制进行更小的拆分,这将为数据框提供更多的输入分区。以下配置已更改: parquet.block.size、mapred.max.split.size、mapred.min.split.size 均设置为 Long.toString(8 * 1024 * 1024L) 。这 still 给了我 4 个分区 toJavaRDD 调用后如何获得更多分区的答案? 【参考方案1】:

当您使用 Spark 读取文件时,执行程序的数量和内核的数量都不会以任何方式影响任务的数量。分区的数量(以及作为结果的任务)仅由输入中的块数决定。如果您有 4 个文件小于 HDFS 块大小 - 无论如何都是 4 个块,结果是 4 个分区。公式为 number_of_files * number_of_blocks_in_file。因此,请查看您的文件夹并计算其中包含多少文件以及每个文件的大小。这应该可以回答您的问题。

UPD:如果您没有手动重新分区您的 DataFrame 并且您的 DataFrame 不是由于加入或任何其他 shuffle 操作而创建的,那么上述所有内容都是正确的。

UPD: 修复了答案详情。

【讨论】:

我的文件夹包含 2 个文件,每个文件有 250MB。所以基本上你是说没有办法拥有比块数更多的分区(在这种情况下,4个128 MB的块)?为什么我会在最初读取文件时看到创建了 12 个任务?还是我对这 12 个任务的解释是错误的?在这里:***.com/questions/27194333/… 有人建议用更小的 parquet.block.size 编写镶木地板文件可能会奏效 - 但我尝试过设置但没有运气。 正确。您始终可以使用 repartition 方法强制分区数。 我知道重新分区是一种选择,但这会触发洗牌,这不是最佳选择。我在集群中有更多的核心 * 执行程序,我想充分利用它们,理想情况下是从初始读取操作中获得更多分区。 你是如何设置“parquet.block.size”属性的?像这样 sparkContext.hadoopConfiguration.set("parquet.block.size", size) ? 无论如何。这完全没有关系。 parquet 块大小控制写入 parquet 文件时内存中数据的块大小。它会影响压缩率和其他一些因素,但不会显着影响分区。

以上是关于Spark parquet 数据帧分区数的主要内容,如果未能解决你的问题,请参考以下文章

将 Spark 数据帧写入带分区的 CSV

将 Spark 数据帧写入带分区的 CSV

为分区数据定义 Impala 表模式

重新分区 pyspark 数据帧失败以及如何避免初始分区大小

将 Spark 数据帧保存为 Hive 中的动态分区表

spark数据帧的分区数?