我们如何在 Apache Spark 中执行动态重新分区?

Posted

技术标签:

【中文标题】我们如何在 Apache Spark 中执行动态重新分区?【英文标题】:How do we perform dynamic repartionting in Apache Spark? 【发布时间】:2018-02-13 18:01:55 【问题描述】:

让我们假设我们必须在过滤后重新分区数据集或获得度并行度。

我们如何执行动态重新分区而不是手动调整分区数量?

注意 - 寻找 RDD、数据框和数据集的解决方案。

【问题讨论】:

【参考方案1】:

您可以使用 repartition(colname) 或 partitionBy() 对数据集进行动态分区。

例如,如果您的数据集如下所示

 create table sensor_data (
  sensor_id bigint,
  temp  float,
  region_id  string,
  state  string,
  country   string
 ) partition by ( day string)

如果您想针对特定日期进行区域计算,

val sensor_data = spark.sql("select * from sensor_data where day='2018-02-10')
val results = sensor_data.
     repartition(col("region_id")).
     mapPartitions( eventIter =>  
       processEvent(eventIter).iterator
  )

 case Event(sensor_id: String, country: String, max_temp: float)


 def processEvent(evtIter: Iterator[Row]) : List[Event] = 
    val maxTempEvents =  ListBuffer[Event]()
    while (evtIter.hasNext) 
       val evt = evtIter.next()
       // do your calculation and add results to maxTempEvents list
    
   maxTempEvents
 

希望这会有所帮助。

谢谢 拉维

【讨论】:

如果能举个例子就好了。 非常感谢,如果您能回答以下问题,那将非常有帮助 - Link

以上是关于我们如何在 Apache Spark 中执行动态重新分区?的主要内容,如果未能解决你的问题,请参考以下文章

DAG 如何让 Apache Spark 容错?

如何在 spark scala 中重命名 S3 文件而不是 HDFS

如何获取有关当前执行程序 Apache-Spark 的元数据?

提效 7 倍,Apache Spark 自适应查询优化在网易的深度实践及改进

一文了解 Apache Spark 3.0 动态分区裁剪(Dynamic Partition Pruning)

区分 Apache Spark 中的驱动程序代码和工作代码