什么时候在 Spark 中执行 .repartition(Int AnyValue),在读取 Parquet 之后(或)在 Parquet 上运行计算之后?

Posted

技术标签:

【中文标题】什么时候在 Spark 中执行 .repartition(Int AnyValue),在读取 Parquet 之后(或)在 Parquet 上运行计算之后?【英文标题】:When to do .repartition(Int AnyValue) in Spark, right after reading the Parquet (or) after running computations on that Parquet? 【发布时间】:2019-06-05 15:53:41 【问题描述】:

我有一个基于日期列分区的镶木地板文件,执行 .reparition() 的优化方法是什么?我应该在读完镶木地板后立即做吗,如下所示:

 val myPq = "/hive/data/myPq.db/myPq"
    sparkSession.read
      .format("parquet")
      .load(inputConfig.getString("myPq")).repartition(200).createOrReplaceTempView("myPqView")

或者在执行过滤器、地图等各种转换之后。

def readMyPq()
val myPq = "/hive/data/myPq.db/myPq"
    sparkSession.read
      .format("parquet")
   .load(inputConfig.getString("myPq")).repartition(200).createOrReplaceTempView("myPqView")

val returnDF = spark.sql("Multiple Transformations from createOrReplaceTempView")


什么是好方法?提前谢谢你

【问题讨论】:

【参考方案1】:

这完全取决于您认为您的数据何时最不平衡。如果数据以不平衡的方式读入(例如 1 个大分区,199 个小分区),您可能需要立即重新分区。

如果数据在你读入时是平衡的,但在你的 filter/map/etc 之后有一个大分区和一堆小分区,此时你可能需要重新分区。

如果您的数据需要,甚至可以在这两个地方重新分区。 请记住,重新分区并不便宜,应该与不重新分区的成本相平衡。使用一些正常数据流样本进行基准测试并查看结果。

【讨论】:

【参考方案2】:

我看不出您的代码 sn-ps 之间有什么区别,除了第二个是包含第一个的方法。但是,重新分区会在执行程序之间打乱您的数据。因此,如果您可以选择稍后在代码中重新分区,最好在尽可能小的数据集上重新分区。如果您使用指定的过滤器并且这会过滤掉大量数据,那么最好在发生这种情况后调用 repartition。

【讨论】:

嘿乔纳森,第二个 sn-p 有很多我编辑过的转换。 @Pavan_Obj,您能否简要介绍一下这些转换,以便我更好地提供帮助?即使没有代码,它仍然会有所帮助。类似于“将每一行映射到一个数组,然后基于某个属性的 flatMap 元素,然后过滤掉空元素”。即使有了这些细节,也更容易提供帮助

以上是关于什么时候在 Spark 中执行 .repartition(Int AnyValue),在读取 Parquet 之后(或)在 Parquet 上运行计算之后?的主要内容,如果未能解决你的问题,请参考以下文章

Spark groupBy vs repartition plus mapPartitions

Spark中repartition和coalesce的用法

Spark 中的 Rebalance 操作以及与Repartition操作的区别

在 Spark 2 中使用 DataSet.repartition - 多个任务处理多个分区

使用 reduceByKey(numPartitions) 或 repartition 规范化 SPARK RDD 分区

Spark 重新分区执行器