使用自定义分区器解决Spark DataSet数据分区不均匀的问题

Posted 大数据技术与数仓

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用自定义分区器解决Spark DataSet数据分区不均匀的问题相关的知识,希望对你有一定的参考价值。

在一文中,介绍了Spark是如何管理分区的,分别解释了Spark提供的两种分区方法,并给出了相应的使用示例和分析,感兴趣的可以参考之前的分享。我们知道,Apache Spark通常用于以分布式方式处理大规模数据集,既然是分布式,就会面临一个问题:数据是否均匀地分布。当数据分布不均匀时,数据量较少的分区将会很快的被执行完成,而数据量较大的分区将需要很长时间才能够执行完毕,这就是我们经常所说的数据倾斜, 这可能会导致Spark作业的性能降低。那么,该如何解决类似的问题呢?我们可以使用 Spark提供的自定义分区器在RDD上应用数据分区的逻辑。以下是正文,希望对你有所帮助。

Spark默认的分区器

Spark在处理大规模数据集时,会将数据分为不同的分区,并以并行方式处理数据。默认情况下,它使用哈希分区程序将数据分散到不同的分区中。哈希分区程序使用的是hashcode函数, 其原理是相等的对象具有相同的哈希码,这样就可以根据key的哈希值,将其分布到各自的分区中。

哈希分区源码

class HashPartitioner(partitions: Intextends Partitioner {
  require(partitions >= 0s"Number of partitions ($partitions) cannot be negative.")

  def numPartitionsInt = partitions

  def getPartition(key: Any): Int = key match {
    case null => 0
    case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
  }

  override def equals(other: Any): Boolean = other match {
    case h: HashPartitioner =>
      h.numPartitions == numPartitions
    case _ =>
      false
  }

  override def hashCodeInt = numPartitions
}

示例

// 创建spark session
    val spark = SparkSession
      .builder
      .master("local[4]")
      .getOrCreate
    val bigData = util.Arrays.asList("Hadoop""Spark""Flink""Hive""Impala""Hbase""Kafka""ClickHouse""KUDU""zookeeper")
    import spark.implicits._
    val stringDataset = spark.createDataset(bigData)
    println("当前rdd的分区数为:" + stringDataset.rdd.partitions.length) // 当前rdd的分区数为:4

    /**
      * 每个分区的数据会写入一个对应的分区文件中,
      * 每个分区文件对应的数据如下:
      * Partiiton 0: Hadoop Spark
      * Partition 1: Flink Hive Impala
      * Partition 2: Hbase Kafka
      * Partition 3: ClickHouse KUDU zookeeper
      */

    stringDataset.write.csv("E://testPartition")
    

通常情况下,我们需要加大分区的数量,从而保证每个分区的数据量尽量少,进而可以提升处理的并行度。此时,就需要使用repartition() 方法对数据进行重分区。

repartition() 方法既可以用于增加分区,也可以用于减少分区。比如,对上面的示例进行增加分区,如下代码所示:

val reparationDS = stringDataset.repartition(8)
    println("重分区之后的分区为:" + reparationDS.rdd.partitions.length) // 重分区之后的分区为:8
    /**
      *
      * Partition 0: FlinK ClickHouse
      * Partition 1:
      * Partition 2:
      * Partition 3:
      * Partition 4:
      * Partition 5:
      * Partition 6: Spark Impala Hbase zookeeper
      * Partition 7: Hadoop Hive Kafka KUDU
      */

    reparationDS.write.csv("E://repartition")

上面的数据没有在所有分区中平均分配。尽管通过应用repartition()方法增加了分区的数量,但是数据并不是均匀分布的。上面提到,Spark使用的是哈希分区,所以有时,应用repartition()也可能无法解决问题(可能会存在部分分区无数据,而个别分区数据比较多的情况)。

为了解决这个问题,Spark提供了自定义分区器,用户可以根据处理数据的特点,进行自定义分区器。

如何自定义Spark的分区器

需要注意的是,自定义分区器只能应用于key-value形式的 pair RDD。所以在使用自定义分区器的时候,需要从原始的RDD中创建出PairedRDD,然后再使用自定义分区器。

实现一个自定义的分区器非常简单,只需要继承一个org.apache.spark.Partitioner类,然后重写下面的方法即可:

  • numPartitions:此方法返回要为RDD创建的分区数

  • def getPartition(key: Any):此方法返回key对应的分区号(范围从0到numPartitions - 1)

源码

/**
 * An object that defines how the elements in a key-value pair RDD are partitioned by key.
 * Maps each key to a partition ID, from 0 to `numPartitions - 1`.
 */

abstract class Partitioner extends Serializable {
  def numPartitionsInt
  def getPartition(key: Any): Int
}

自定义分区器

  • 自定义分区类
class CustomerPartition(partitions: Intextends Partitioner {

  def numPartitionsInt = partitions

  def getPartition(key: Any): Int = {

    (key.toString.charAt(0) + scala.util.Random.nextInt(10)) % numPartitions
  }
}
  • 使用示例
object DefaultPartition {

  def main(args: Array[String]): Unit = {

    // 创建spark session
    val spark = SparkSession
      .builder
      .master("local[4]")
      .getOrCreate
    val bigData = util.Arrays.asList("Hadoop""Spark""Flink""Hive""Impala""Hbase""Kafka""ClickHouse""KUDU""zookeeper")
    import spark.implicits._
    val stringDataset = spark.createDataset(bigData)
    println("当前rdd的分区数为:" + stringDataset.rdd.partitions.length) // 当前rdd的分区数为:4

    /**
      * 每个分区的数据会写入一个对应的分区文件中,
      * 每个分区文件对应的数据如下:
      * Partiiton 0: Hadoop Spark
      * Partition 1: Flink Hive Impala
      * Partition 2: Hbase Kafka
      * Partition 3: ClickHouse KUDU zookeeper
      */

    //stringDataset.write.csv("E://testPartition")

    val reparationDS = stringDataset.repartition(8)
    println("重分区之后的分区为:" + reparationDS.rdd.partitions.length) // 重分区之后的分区为:8
    /**
      *
      * Partition 0: FlinK ClickHouse
      * Partition 1:
      * Partition 2:
      * Partition 3:
      * Partition 4:
      * Partition 5:
      * Partition 6: Spark Impala Hbase zookeeper
      * Partition 7: Hadoop Hive Kafka KUDU
      */

    // reparationDS.write.csv("E://repartition")

    val stringRDD = stringDataset.rdd
    val pairRDD = stringRDD.map(word => (word, word.length))
    // 使用自定义分区器
    val resultRDD = pairRDD.partitionBy(new CustomerPartition(8)) // 自定义分区的数量:8
    println("自定义分区的数量:" + resultRDD.getNumPartitions)

    // 数据写入CSV文件
    val outputRDD = resultRDD.map(_._1)
    val outputDS = spark.createDataset(outputRDD)

    /**
      * Partition 0: Hive
      * Partition 1: Impala
      * Partition 2: ClickHouse
      * Partition 3: Spark Hbase
      * Partition 4: KUDU
      * Partition 5: Hadoop
      * Partition 6: Flink
      * Partition 7: Kafka zookeeper
      */

    outputDS.write.csv("E:/customerpartition/")
  }
}

从上面的结果可以看出,数据均匀地分布在了每个分区上,这样就会缓解数据倾斜造成的性能瓶颈。

总结

本文主要分享了如何使用Spark提供的Partitioner类进行自定义一个分区器,并给出了具体的示例。通过自定义分区器,就可以有效地分布数据,从而缓解数据倾斜的性能瓶颈。如果本文对你所有帮助,请分享转发。

     




Spark系列精选
使用自定义分区器解决Spark DataSet数据分区不均匀的问题




☑ 

☑ 

 

 

使用自定义分区器解决Spark DataSet数据分区不均匀的问题
点分享
使用自定义分区器解决Spark DataSet数据分区不均匀的问题
点收藏
点点赞
点在看      

以上是关于使用自定义分区器解决Spark DataSet数据分区不均匀的问题的主要内容,如果未能解决你的问题,请参考以下文章

spark自定义分区器实现

spark自定义分区器

apache spark中的自定义分区器

spark中的分区和自定义分区器中的重新分区和排序给出数组越界异常

Spark 2.0.0:如何使用自定义编码类型聚合 DataSet?

Mongo Spark Connector中的分区器