什么时候在 Spark 中执行 .repartition(Int AnyValue),在读取 Parquet 之后(或)在 Parquet 上运行计算之后?
Posted
技术标签:
【中文标题】什么时候在 Spark 中执行 .repartition(Int AnyValue),在读取 Parquet 之后(或)在 Parquet 上运行计算之后?【英文标题】:When to do .repartition(Int AnyValue) in Spark, right after reading the Parquet (or) after running computations on that Parquet? 【发布时间】:2019-06-05 15:53:41 【问题描述】:我有一个基于日期列分区的镶木地板文件,执行 .reparition() 的优化方法是什么?我应该在读完镶木地板后立即做吗,如下所示:
val myPq = "/hive/data/myPq.db/myPq"
sparkSession.read
.format("parquet")
.load(inputConfig.getString("myPq")).repartition(200).createOrReplaceTempView("myPqView")
或者在执行过滤器、地图等各种转换之后。
def readMyPq()
val myPq = "/hive/data/myPq.db/myPq"
sparkSession.read
.format("parquet")
.load(inputConfig.getString("myPq")).repartition(200).createOrReplaceTempView("myPqView")
val returnDF = spark.sql("Multiple Transformations from createOrReplaceTempView")
什么是好方法?提前谢谢你
【问题讨论】:
【参考方案1】:这完全取决于您认为您的数据何时最不平衡。如果数据以不平衡的方式读入(例如 1 个大分区,199 个小分区),您可能需要立即重新分区。
如果数据在你读入时是平衡的,但在你的 filter/map/etc 之后有一个大分区和一堆小分区,此时你可能需要重新分区。
如果您的数据需要,甚至可以在这两个地方重新分区。 请记住,重新分区并不便宜,应该与不重新分区的成本相平衡。使用一些正常数据流样本进行基准测试并查看结果。
【讨论】:
【参考方案2】:我看不出您的代码 sn-ps 之间有什么区别,除了第二个是包含第一个的方法。但是,重新分区会在执行程序之间打乱您的数据。因此,如果您可以选择稍后在代码中重新分区,最好在尽可能小的数据集上重新分区。如果您使用指定的过滤器并且这会过滤掉大量数据,那么最好在发生这种情况后调用 repartition。
【讨论】:
嘿乔纳森,第二个 sn-p 有很多我编辑过的转换。 @Pavan_Obj,您能否简要介绍一下这些转换,以便我更好地提供帮助?即使没有代码,它仍然会有所帮助。类似于“将每一行映射到一个数组,然后基于某个属性的 flatMap 元素,然后过滤掉空元素”。即使有了这些细节,也更容易提供帮助以上是关于什么时候在 Spark 中执行 .repartition(Int AnyValue),在读取 Parquet 之后(或)在 Parquet 上运行计算之后?的主要内容,如果未能解决你的问题,请参考以下文章
Spark groupBy vs repartition plus mapPartitions
Spark 中的 Rebalance 操作以及与Repartition操作的区别
在 Spark 2 中使用 DataSet.repartition - 多个任务处理多个分区
使用 reduceByKey(numPartitions) 或 repartition 规范化 SPARK RDD 分区