“Exchange hashpartitioning”如何在 spark 中工作

Posted

技术标签:

【中文标题】“Exchange hashpartitioning”如何在 spark 中工作【英文标题】:how is "Exchange hashpartitioning" working in spark 【发布时间】:2019-01-16 11:00:46 【问题描述】:

我有一个数据集,我想将其写入 parquet 文件,以便之后通过 Spark 请求这些文件,包括 Predicate Pushdown。

目前我使用按列重新分区和分区数将数据移动到特定分区。该列正在标识相应的分区(从 0 开始到(固定)n)。结果是 scala/spark 产生了意想不到的结果并创建了更少的分区(其中一些是空的)。也许是哈希冲突?

为了解决问题,我试图找出原因并试图找到解决方法。我通过将数据帧转换为 rdd 并将 partitionBy 与 HashPartitioner 一起使用找到了一种解决方法。令我惊讶的是:我得到了预期的结果。但是将数据帧转换为 RDD 对我来说不是一个解决方案,因为它占用了太多资源。

我已经在这个环境上测试过

cloudera CDH 5.9.3 上的 SPARK 2.0

emr-5.17.0 上的 SPARK 2.3.1

这是我的输出测试。请使用 Spark-shell 运行它们

    scala> import org.apache.spark.HashPartitioner
    import org.apache.spark.HashPartitioner

    scala> val mydataindex = Array(0,1, 2, 3,4)
    mydataindex: Array[Int] = Array(0, 1, 2, 3, 4)

    scala> val mydata = sc.parallelize(for 
         |  x <- mydataindex
         |  y <- Array(123,456,789)
         |  yield (x, y), 100)
    mydata: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:27

    scala> val rddMyData = mydata.partitionBy(new HashPartitioner(5))
    rddMyData: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[1] at partitionBy at <console>:26

    scala> val rddMyDataPartitions =   rddMyData.mapPartitionsWithIndex
         |                 (index, iterator) => 
         |                    val myList = iterator.toList
         |                    myList.map(x => x + " -> " + index).iterator
         |                 
         |              
    rddMyDataPartitions: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at mapPartitionsWithIndex at <console>:26

    scala>
         | // this is expected:

    scala> rddMyDataPartitions.take(100)
    res1: Array[String] = Array((0,123) -> 0, (0,456) -> 0, (0,789) -> 0, (1,123) -> 1, (1,456) -> 1, (1,789) -> 1, (2,123) -> 2, (2,456) -> 2, (2,789) -> 2, (3,456) -> 3, (3,789) -> 3, (3,123) -> 3, (4,789) -> 4, (4,123) -> 4, (4,456) -> 4)

    scala> val dfMyData = mydata.toDF()
    dfMyData: org.apache.spark.sql.DataFrame = [_1: int, _2: int]

    scala> val dfMyDataRepartitioned = dfMyData.repartition(5,col("_1"))
    dfMyDataRepartitioned: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [_1: int, _2: int]

    scala> dfMyDataRepartitioned.explain(false)
    == Physical Plan ==
    Exchange hashpartitioning(_1#3, 5)
    +- *(1) SerializeFromObject [assertnotnull(input[0, scala.Tuple2, true])._1 AS _1#3, assertnotnull(input[0, scala.Tuple2, true])._2 AS _2#4]
       +- Scan ExternalRDDScan[obj#2]

    scala> val dfMyDataRepartitionedPartition  = dfMyDataRepartitioned.withColumn("partition_id", spark_partition_id()).groupBy("partition_id").count()
    dfMyDataRepartitionedPartition: org.apache.spark.sql.DataFrame = [partition_id: int, count: bigint]

    scala> // this is unexpected, because 1 partition has more indexes

    scala> dfMyDataRepartitionedPartition.show()
    +------------+-----+
    |partition_id|count|
    +------------+-----+
    |           1|    6|
    |           3|    3|
    |           4|    3|
    |           2|    3|
    +------------+-----+

我首先知道 HashPartitioner 被用于对数据帧进行重新分区的方法中,但事实并非如此,因为它适用于 RDD。

谁能指导我这个“Exchange hashpartitioning”(参见上面的解释输出)是如何工作的?

2019-01-16 12:20:这不是 How does HashPartitioner work? 的重复,因为我对 Integer 列上按列(+ 数字分区)重新分区的哈希算法感兴趣。正如您在源代码中看到的那样,一般的 HashPartitioner 正在按预期工作。

【问题讨论】:

How does HashPartitioner work?的可能重复 感谢您的回复,但事实并非如此:正如我已经写过的那样:我尝试使用 HashPartitioner 并且能够理解那里是如何计算哈希的。在这里,我要求按列+分区数重新分区中使用的哈希算法,结果不同 【参考方案1】:

这里没有什么意外的。正如How does HashPartitioner work? 中解释的那样,Spark 使用 hash(key) 模数分区和非均匀分布,尤其是在小型数据集上并非意外。

DatasetRDD 之间的差异也是意料之中的,因为两者都使用不同的哈希函数(同上)。

终于

结果是 scala/spark 产生了意想不到的结果并创建了更少的分区

不是正确的观察。创建的分区数正是要求的

scala> dfMyDataRepartitioned.rdd.getNumPartitions
res8: Int = 5

但是空的在聚合中是不可见的,因为没有对应的值。

【讨论】:

感谢您的回答。但我对哈希函数本身很感兴趣。我正在使用整数字段来重新分区。 Itneger 的哈希码是整数本身。你能告诉我在哪里可以找到哈希函数的实现吗? 你是对的。它正在使用 Murmur3Hash。 github.com/apache/spark/blob/master/sql/core/src/main/scala/org/…

以上是关于“Exchange hashpartitioning”如何在 spark 中工作的主要内容,如果未能解决你的问题,请参考以下文章