Spark自定义分区(Partitioner)

Posted 大葱拌豆腐

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark自定义分区(Partitioner)相关的知识,希望对你有一定的参考价值。

我们都知道Spark内部提供了HashPartitionerRangePartitioner两种分区策略,这两种分区策略在很多情况下都适合我们的场景。但是有些情况下,Spark内部不能符合咱们的需求,这时候我们就可以自定义分区策略。为此,Spark提供了相应的接口,我们只需要扩展Partitioner抽象类,然后实现里面的三个方法:

01
    package org.apache.spark
02
     
03
    /**
04
     * An object that defines how the elements in a key-value pair RDD are partitioned by key.
05
     * Maps each key to a partition ID, from 0 to `numPartitions - 1`.
06
     */
07
    abstract class Partitioner extends Serializable {
08
      def numPartitions: Int
09
      def getPartition(key: Any): Int
10
    }

def numPartitions: Int:这个方法需要返回你想要创建分区的个数;

def getPartition(key: Any): Int:这个函数需要对输入的key做计算,然后返回该key的分区ID,范围一定是0到numPartitions-1

equals():这个是Java标准的判断相等的函数,之所以要求用户实现这个函数是因为Spark内部会比较两个RDD的分区是否一样。

  假如我们想把来自同一个域名的URL放到一台节点上,比如:http://www.iteblog.comhttp://www.iteblog.com/archives/1368,如果你使用HashPartitioner,这两个URL的Hash值可能不一样,这就使得这两个URL被放到不同的节点上。所以这种情况下我们就需要自定义我们的分区策略,可以如下实现:

01
    package com.iteblog.utils
02
     
03
    import org.apache.spark.Partitioner
04
     
05
    /**
06
     * User: 过往记忆
07
     * Date: 2015-05-21
08
     * Time: 下午23:34
09
     * bolg: http://www.iteblog.com
10
     * 本文地址:http://www.iteblog.com/archives/1368
11
     * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
12
     * 过往记忆博客微信公共帐号:iteblog_hadoop
13
     */
14
     
15
    class IteblogPartitioner(numParts: Int) extends Partitioner {
16
      override def numPartitions: Int = numParts
17
     
18
      override def getPartition(key: Any): Int = {
19
        val domain = new java.net.URL(key.toString).getHost()
20
        val code = (domain.hashCode % numPartitions)
21
        if (code < 0) {
22
          code + numPartitions
23
        } else {
24
          code
25
        }
26
      }
27
     
28
      override def equals(other: Any): Boolean = other match {
29
        case iteblog: IteblogPartitioner =>
30
          iteblog.numPartitions == numPartitions
31
        case _ =>
32
          false
33
      }
34
     
35
      override def hashCode: Int = numPartitions
36
    }

因为hashCode值可能为负数,所以我们需要对他进行处理。然后我们就可以在partitionBy()方法里面
使用我们的分区:

1
    iteblog.partitionBy(new IteblogPartitioner(20))

  类似的,在Java中定义自己的分区策略和Scala类似,只需要继承org.apache.spark.Partitioner,并实现其中的方法即可。

  在Python中,你不需要扩展Partitioner类,我们只需要对iteblog.partitionBy()加上一个额外的hash函数,如下:

1
    import urlparse
2
     
3
    def iteblog_domain(url):
4
      return hash(urlparse.urlparse(url).netloc)
5
     
6
    iteblog.partitionBy(20, iteblog_domain)

 

以上是关于Spark自定义分区(Partitioner)的主要内容,如果未能解决你的问题,请参考以下文章

MapReduce之自定义分区器Partitioner

Hadoop自定义分区Partitioner

MapReduce之自定义Partitioner

Hadoop中的MapReduce框架原理自定义Partitioner步骤在Job驱动中,设置自定义PartitionerPartition 分区案例

自定义分区错误

Mongo Spark Connector中的分区器