Hadoop 分区器

Posted

技术标签:

【中文标题】Hadoop 分区器【英文标题】:Hadoop partitioner 【发布时间】:2014-12-22 00:14:44 【问题描述】:

我想问一下 ,它是在 Mappers 中实现的吗?如何衡量使用默认哈希分区器的性能 - 是否有更好的分区器来减少数据倾斜?

谢谢

【问题讨论】:

【参考方案1】:

分区器不在 Mapper 中。

以下是每个 Mapper 中发生的过程 -

每个映射任务都将其输出写入循环缓冲存储器(而不是磁盘)。 当缓冲区达到阈值时,后台线程开始将内容溢出到磁盘。 [缓冲区大小由 mapreduce.task.io.sort.mb 属性控制,默认为 100 MB,溢出由 mapreduce.io.sort.spill.percent 属性控制,默认为 0.08 或 80%]。在溢出到磁盘之前 数据被分区对应于它们将被发送到的减速器 在每个分区中按键执行内存排序 对每个排序的结果运行组合器函数(减少写入和传输的数据,这需要专门完成) 压缩(可选)[mapred.compress.map.output=true; mapred.map.output.compression.codec=CodecName] 写入磁盘和输出文件的分区可通过 HTTP 提供给 reducer。

以下是每个 Reducer 中发生的过程

现在每个 Reducer 收集每个映射器的所有文件,它进入排序/合并阶段(排序已在映射器端完成),合并所有映射输出并保持排序顺序。

李>

在归约阶段,针对排序输出中的每个键调用归约函数。

下面是代码,说明了键分区的实际过程。 getpartition() 将根据其哈希码返回特定密钥必须发送到的分区号/reducer。 Hashcode 对于每个键都必须是唯一的,并且在整个环境中,Hashcode 对于键应该是唯一且相同的。为此,hadoop 为其键实现了自己的哈希码,而不是使用 java 默认哈希码。

 Partition keys by their hashCode(). 

        public class HashPartitioner<K, V> extends Partitioner<K, V> 
        public int getPartition(K key, V value,
                                 int numReduceTasks) 
           return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
       

       

【讨论】:

【参考方案2】:

Partitioner 是介于 Mappers 和 Reducers 之间的关键组件。它在 Reducer 之间分发地图发出的数据。

Partitioner 在每个 Map Task JVM(java 进程)中运行。

默认分区器HashPartitioner基于哈希函数工作,与TotalOrderPartitioner等其他分区器相比,它的速度非常快。它在每个地图输出键上运行哈希函数,即:

Reduce_Number = (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;

要检查 Hash Partitioner 的性能,请使用 Reduce 任务计数器并查看 reducer 之间的分布情况。

Hash Partitioner 是基本的分区器,不适合处理高偏度的数据。

为了解决数据倾斜问题,我们需要编写自定义分区器类,从 MapReduce API 扩展 Partitioner.java 类。

自定义分区器的示例类似于RandomPartitioner。这是在 reducer 之间平均分配倾斜数据的最佳方法之一。

【讨论】:

以上是关于Hadoop 分区器的主要内容,如果未能解决你的问题,请参考以下文章

如果在 Hadoop Map Reduce 中定义了自定义分区器,默认哈希分区器是不是仍然有效?

Hadoop Oozie MapReduce 操作自定义分区器

如何在python中为Hadoop Map Reduce作业编写组合器和分区器?我如何在Hadoop Job中调用它

如何修复 hadoop 中的“非法分区”错误?

未调用 hadoop mapreduce 分区程序

Hadoop学习之路MapReduce自定义分区实现