hadoop mapreduce 分桶
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了hadoop mapreduce 分桶相关的知识,希望对你有一定的参考价值。
参考技术A 老大之前在百度,由于shell 和awk 写的溜,所以他总是推荐 使用shell 和awk 来跑 hadoop streaming 【hs】,hs还真是一个好东西,不需要编译,想怎么执行就怎么整,还不需要IDE,只要你你记住主要的执行内容就完全没有问题,hs也支持 python ,这样一来 hadoop 可以让所有人都可以使用了。可以参考这个文章
https://www.programcreek.com/java-api-examples/index.php?api=org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
https://github.com/roanjain/hadoop-partitioner/blob/master/src/com/mapred/partitioner/PartitionerDemo.java
老大讲在百度 执行hs 时经常分桶,用来控制 map 和 Reduce 任务的个数,确实还是有帮助的,可以控制 桶的个数,也可以控制资源的使用情况,比如 cpu 和内存
一篇干货很多的文章可以参考 http://www.cnblogs.com/van19/p/5756448.html
Hadoop用于对key的排序和分桶的设置选项比较多和复杂,目前在公司内主要以KeyFieldBasePartitioner和KeyFieldBaseComparator被hadoop用户广泛使用。
基本概念:
Partition:分桶过程,用户输出的key经过partition分发到不同的reduce里,因而partitioner就是分桶器,一般用平台默认的hash分桶也可以自己指定。
Key:是需要排序的字段,相同分桶&&相同key的行排序到一起。
下面以一个简单的文本作为例子,通过搭配不同的参数跑出真实作业的结果来演示这些参数的使用方法。
假设map的输出是这样以点号分隔的若干行:
d.1.5.23
e.9.4.5
e.5.9.22
e.5.1.45
e.5.1.23
a.7.2.6
f.8.3.3
我们知道,在streaming模式默认hadoop会把map输出的一行中遇到的第一个设定的字段分隔符前面的部分作为key,后面的作为 value,如果输出的一行中没有指定的字段分隔符,则整行作为key,value被设置为空字符串。 那么对于上面的输出,如果想用map输出的前2个字段作为key,后面字段作为value,并且不使用hadoop默认的“\t”字段分隔符,而是根据该 文本特点使用“.”来分割,需要如何设置呢
bin/hadoop streaming -input /tmp/comp-test.txt -output /tmp/xx -mapper cat -reducer cat
-jobconf stream.num.map.output.key.fields=2
-jobconf stream.map.output.field.separator=.
-jobconf mapred.reduce.tasks=5
结果:
e.9 4.5
f.8 3.3
——————
d.1 5.23
e.5 1.23
e.5 1.45
e.5 9.22
——————
a.7 2.6
总结:
从结果可以看出,在reduce的输出中,前两列和后两列用“\t”分隔,证明map输出时确实把用“.”分隔的前两列作为key,后面的作为 value。并且前两列相同的“e.5”开头的三行被分到了同一个reduce中,证明确实以前两列作为key整体做的partition。
stream.num.map.output.key.fields 设置map输出的前几个字段作为key
stream.map.output.field.separator 设置map输出的字段分隔符
KeyFieldBasePartitioner的用法
如果想要灵活设置key中用于partition的字段,而不是把整个key都用来做partition。就需要使用hadoop中的org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner了。
下面只用第一列作partition,但依然使用前两列作为key。
bin/hadoop streaming -input /tmp/comp-test.txt -output /tmp/xx -mapper cat -reducer cat
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
-jobconf stream.num.map.output.key.fields=2
-jobconf stream.map.output.field.separator=.
-jobconf map.output.key.field.separator=.
-jobconf num.key.fields.for.partition=1
-jobconf mapred.reduce.tasks=5
结果:
d.1 5.23
——————
e.5 1.23
e.5 1.45
e.5 9.22
e.9 4.5
——————
a.7 2.6
f.8 3.3
总结:
从结果可以看出,这次“e”开头的行都被分到了一个桶内,证明做partition是以第一列为准的,而key依然是前两列。并且在同一个 partition内,先按照第一列排序,第一列相同的,按照第二列排序。这里要注意的是使用 map.output.key.field.separator来指定key内字段的分隔符,这个参数是KeyFieldBasePartitioner 和KeyFieldBaseComparator所特有的。
map.output.key.field.separator 设置key内的字段分隔符
num.key.fields.for.partition 设置key内前几个字段用来做partition
事实上KeyFieldBasePartitioner还有一个高级参数 mapred.text.key.partitioner.options,这个参数可以认为是 num.key.fields.for.partition的升级版,它可以指定不仅限于key中的前几个字段用做partition,而是可以单独指定 key中某个字段或者某几个字段一起做partition。
比如上面的需求用mapred.text.key.partitioner.options表示为
mapred.text.key.partitioner.options=-k1,1
注意mapred.text.key.partitioner.options和num.key.fields.for.partition不需要一起使用,一起使用则以num.key.fields.for.partition为准。
这里再举一个例子,使用mapred.text.key.partitioner.options
bin/hadoop streaming -input /tmp/comp-test.txt -output /tmp/xx -mapper cat -reducer cat
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
-jobconf stream.num.map.output.key.fields=3
-jobconf stream.map.output.field.separator=.
-jobconf map.output.key.field.separator=.
-jobconf mapred.text.key.partitioner.options=-k2,3
-jobconf mapred.reduce.tasks=5
结果:
e.9.4 5
——————
a.7.2 6
e.5.9 22
——————
d.1.5 23
e.5.1 23
e.5.1 45
f.8.3 3
可见,这次是以前3列作为key的,而partition则以key中的第2-3列,因此以“e”开头的行被拆散了,但第二三列相同的“5,1”被 分到一个桶内。在同一个桶内,依然是从key的第一列开始排序的,注意,KeyFieldBasePartitioner只影响分桶并不影响排序。
mapred.text.key.partitioner.options 设置key内某个字段或者某个字段范围用做partition
KeyFieldBaseComparator的用法
首先简单解释一下hadoop框架中key的comparator,对于hadoop所识别的所有java的key类型(在框架看来key的类型只 能是java的),很多类型都自定义了基于字节的比较器,比如Text,IntWritable等等,如果不特别指定比较器而使用这些类型默认的,则会将 key作为一个整体的字节数组来进行比较。而KeyFieldBaseComparator则相当于是一个可以灵活设置比较位置的高级比较器,但是它并没 有自己独有的比较逻辑,而是使用默认Text的基于字典序或者通过-n来基于数字比较。
之前的例子使用KeyFieldBasePartitioner自定义了使用key中的部分字段做partition,现在我们通过org.apache.hadoop.mapred.lib.KeyFieldBasedComparator来自定义使用key中的部分字段做比较。
这次把前四列都作为key,前两列做partition,排序依据优先依据第三列正序(文本序),第四列逆序(数字序)的组合排序。
bin/hadoop streaming -input /tmpcomp-test.txt -output /tmp/xx -mapper cat -reducer cat
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
-jobconf mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator
-jobconf stream.num.map.output.key.fields=4
-jobconf stream.map.output.field.separator=.
-jobconf map.output.key.field.separator=.
-jobconf mapred.text.key.partitioner.options=-k1,2
-jobconf mapred.text.key.comparator.options="-k3,3 -k4nr"
-jobconf mapred.reduce.tasks=5
结果:
e.5.1.45
e.5.1.23
d.1.5.23
e.5.9.22
——————
a.7.2.6
——————
f.8.3.3
e.9.4.5
总结:
从结果可以看出,符合预期的按照先第三列文本正序,然后第四列基于数字逆序的排序。
另外注意,如果这种写法
mapred.text.key.comparator.options=”-k2″
则会从第二列开始,用字典序一直比较到key的最后一个字节。所以对于希望准确排序字段的需求,还是使用“k2,2”这种确定首尾范围的形式更好。另外如果给定的key中某些行需要排序的列数不够时,会比较到最后一列,缺列的行默认缺少的那一列排序值最小。
mapred.text.key.comparator.options 设置key中需要比较的字段或字节范围
Python开发MapReduce系列Python实现MapReduce分桶
版权声明:本文为博主原创文章,未经博主允许不得转载
首先,先引出两点来展开下面的话题。
(1)map阶段的排序是在hash之后,写入磁盘之前进行。排序的两个关键字是partition(分区编号)和key。
(2)map结束后,并不是马上写到磁盘的,而是有个环形缓冲区,数据写到缓冲区中,默认溢出率是80%(这个值可以通过属性设置 io.sort.mb),每达到溢出条件就溢出生成一个小文件,直到全部数据写完,最后把所有的小文件合并成一个大文件,并写到磁盘中。这样做的目的是减少磁盘寻道时间,让每个map只输出一个文件,并为这个文件提供索引文件,记录下每个reduce对应数据的偏移量.(其实就是为map与reduce之间的分发建立映射关系)
1、默认情况介绍
在hadoop streaming的默认情况下,是以”\t”作为分隔符的。对于标准输入来说,以每行读取到的数据的第一个”\t”为分界线, 在其之前的部分为key,在其之后的为value。如果一个 "\t" 字符没有,则整行都被当做是key处理。
2、MapReduce shuffler过程中的sort和partition阶段
mapper阶段除了用户代码,最重要的是shuffle 过程,这个过程是MapReduce耗时和消耗资源的主要地方,因为其涉及到磁盘的写入等操作。这里先不谈优化方面的处理,只研究shuffle 过程中的sort和partition两个过程。为什么只研究这两个过程,因为,sort和partition是MapReduce的核心思想,整个过程就是在不断的重复 排列和分割 的操作。
从第1点可以知道,MapReduce的key默认是以 \t 分割得到的,我们能不能根据自己的需要来获取到特定形式的key?实现类似分桶、根据指定列的排序之类的自由排序呢?答案是可以的。我们可以通过以下的参数来实现:
3、相关的参数介绍
3.1map阶段
-jobconf mapred.reduce.tasks=2【此属性针对下面的例子都有效】 map.output.key.field.separator:指定map输出<key,value>对之后,其中key内部的分割符。 num.key.fields.for.partition:指定分桶时,按照分隔符切割后,用于分桶key所占的列数。 -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner:前两个参数,要配合这个partitioner,没有的话会报错 例如:map.output.key.field.separator = , num.key.fields.for.partition = 2 -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner 一行数据:1 , 2 , 3 , 4 , 5(在这里1 2 之间的逗号是key内部的分隔符,并且1,2格式key的数据分为到同一桶) stream.map.output.field.separator: map中的key与value的分隔符 stream.num.map.output.key.fields:指定map输出按照分隔符切割后,key所占有的列数,之前的是key,之后的是value 例如:map.output.key.field.separator = , num.key.fields.for.partition = 2 -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner stream.map.output.field.separator = : stream.num.map.output.key.fields = 3 输入:1 , 2 , 3 , 4 , 5 1 , 2 , 2 , 4 , 5 1 , 3 , 4 , 4 , 5 1 , 3 , 3 , 4 , 5 输出part-00000:1 , 2 , 2 : 4 , 5 1 , 2 , 3 : 4 , 5 输出part-00000:1 , 3 , 3 : 4 , 5 1 , 3 , 4 : 4 , 5 1 , 2 是分桶值,1 , 2 , 3是key, 4 , 5是value。在这里1 2 之间的逗号是key内部的分隔符,1 , 2格式key的数据分为到同一桶
3.2 reduce阶段
stream.reduce.output.field.separator:reduce中key与value的分隔符
stream.num.reduce.output.key.fields:reduce中分隔符的位置
3、分桶测试
run.sh脚本(作为一个会偷懒的程序猿,能偷懒就偷懒,写个脚本省掉每次写入一大串指令的烦恼)
HADOOP_CMD="/home/hadoop/hadoop/bin/hadoop" STREAM_JAR_PATH="/home/hadoop/hadoop/contrib/streaming/hadoop-streaming-1.2.1.jar" INPUT_PATH_A="/a.txt" INPUT_PATH_B="/b.txt" OUTPUT_PATH="/output" $HADOOP_CMD fs -rmr $OUTPUT_PATH #mapreduce在运行时,文件系统不能存在output目录(目录名字随意) $HADOOP_CMD jar $STREAM_JAR_PATH -input $INPUT_FILE_PATH_A,$INPUT_FILE_PATH_B -output $OUTPUT_SORT_PATH -mapper "python map.py" -reducer "python red.py" -file ./map.py -file ./red.py -jobconf mapred.reduce.tasks=2 -jobconf map.output.key.field.separator=, -jobconf num.key.fields.for.partition=1 \ -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner -jobconf stream.map.output.field.separator=: -jobconf stream.num.map.output.key.fields=3
a.txt内容
1,2,3:hadoop 1,2,1:hadoop 1,2,5:hadoop 1,3,4:hadoop 1,2,9:hadoop 1,2,11:hadoop 1,2,7:hadoop 1,3,15:hadoop 1,3,14:hadoop 1,2,19:hadoop
b.txt内容
1,2,0:java 1,2,2:java 1,2,8:java 1,3,4:java 1,2,2:java 1,2,14:java 1,2,12:java 1,3,1:java 1,3,5:java 1,2,3:java
4、结果输出
【part-00000】输出内容如下:
1,2,0:java 1,2,1:hadoop 1,2,2:java 1,2,2:java 1,2,3:hadoop 1,2,3:java 1,2,5:hadoop 1,2,7:hadoop 1,2,8:java 1,2,9:hadoop 1,2,11:hadoop 1,2,14:java 1,2,19:hadoop
【part-00001】输出内容如下:
1,3,1:java 1,3,4:hadoop 1,3,4:java 1,3,5:java 1,3,14:hadoop 1,3,15:hadoop
5、结果分析
由结果可以看出:
(1)以前2列为分桶标志,因为part-00000,part-00001分别以1,2和1,3开头。
(2)以前3列为key,并且第3列为分桶之后排序的key。
(3)key内部之间是以 , 分隔。
(4)key与value之间是以 : 分隔。
参考:
(1)《Hadoop技术内幕:深入解析MapReduce架构设计与实现原理》
以上是关于hadoop mapreduce 分桶的主要内容,如果未能解决你的问题,请参考以下文章
使用 mapred 或 mapreduce 包创建 Hadoop 作业哪个更好?
hadoop.mapred vs hadoop.mapreduce?
Hadoop MapReduce 新旧 mapred 与 mapreduce API
解决 mapreduce.Cluster 无法使用 org.apache.hadoop.mapred.YarnClientProtocolProvider 由于实例化 YarnClient 错误