MapReduce小文件优化与分区
Posted areyouready
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MapReduce小文件优化与分区相关的知识,希望对你有一定的参考价值。
一、小文件优化
1.Mapper类
package com.css.combine; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /** * 思路? * wordcount单词计数 * <单词,1> * * 数据传输 * * KEYIN:数据的起始偏移量0~10 11~20 21~30 * VALUEIN:数据 * * KEYOUT:mapper输出到reduce阶段 k的类型 * VALUEOUT:mapper输出到reduce阶段v的类型 * <China,1><Beijing,1><love,1> */ public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ //key 起始偏移量 value 数据 context 上下文 @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1.读取数据 String line = value.toString(); // 2.切割 love Beijing String[] words = line.split(" "); // 3.循环的写到下一个阶段<love,1><Beijing,1> for (String w : words) { // 4.输出到reducer阶段 context.write(new Text(w), new IntWritable(1)); } } }
2.Reducer类
package com.css.combine; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // 统计单词出现的次数 int sum = 0; // 累加求和 for (IntWritable count : values) { // 拿到值累加 sum += count.get(); } // 结果输出 context.write(key, new IntWritable(sum)); } }
3.Driver类
package com.css.combine; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCountDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 获取job信息 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 获取jar包 job.setJarByClass(WordCountDriver.class); // 获取自定义的mapper与reducer类 job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); // 设置map输出的数据类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 设置reduce输出的数据类型(最终的数据类型) job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 指定运行的inputformat方式 默认的方式是textinputformat(小文件优化) job.setInputFormatClass(CombineTextInputFormat.class); CombineTextInputFormat.setMaxInputSplitSize(job, 4194304); // 最大4M CombineTextInputFormat.setMinInputSplitSize(job, 3145728); // 最小3M // 设置输入存在的路径与处理后的结果路径 FileInputFormat.setInputPaths(job, new Path("c:/in1024/")); FileOutputFormat.setOutputPath(job, new Path("c:/out1024/")); // 提交任务 boolean rs = job.waitForCompletion(true); System.out.println(rs ? 0 : 1); } }
二、分区
1.Mapper类
package com.css.flow.partition; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /** * 3631279850362 13726130503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 www.itstaredu.com 教育网站 24 27 299 681 200 * * 13726130503 299 681 980 */ public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1.获取数据 String line = value.toString(); // 2.切割 String[] fields = line.split(" "); // 3.封装对象 拿到关键字段 数据清洗 String phoneN = fields[1]; long upFlow = Long.parseLong(fields[fields.length - 3]); long dfFlow = Long.parseLong(fields[fields.length - 2]); // 4.输出到reduce端 context.write(new Text(phoneN), new FlowBean(upFlow, dfFlow)); } }
2.Reducer类
package com.css.flow.partition; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean>{ @Override protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException { // 1.相同手机号 的流量使用再次汇总 long upFlow_sum = 0; long dfFlow_sum = 0; // 2.累加 for (FlowBean f : values) { upFlow_sum += f.getUpFlow(); dfFlow_sum += f.getDfFlow(); } FlowBean rs = new FlowBean(upFlow_sum, dfFlow_sum); // 3.输出 context.write(key, rs); } }
3.封装类
package com.css.flow.partition; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; /** * 封装类 数据的传输 */ public class FlowBean implements Writable{ // 定义属性 private long upFlow; private long dfFlow; private long flowSum; public FlowBean() { } // 流量累加 public FlowBean(long upFlow, long dfFlow) { this.upFlow = upFlow; this.dfFlow = dfFlow; this.flowSum = upFlow + dfFlow; } // 反序列化 @Override public void readFields(DataInput in) throws IOException { upFlow = in.readLong(); dfFlow = in.readLong(); flowSum = in.readLong(); } // 序列化 @Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(dfFlow); out.writeLong(flowSum); } @Override public String toString() { return upFlow + " " + dfFlow + " " + flowSum; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDfFlow() { return dfFlow; } public void setDfFlow(long dfFlow) { this.dfFlow = dfFlow; } public long getFlowSum() { return flowSum; } public void setFlowSum(long flowSum) { this.flowSum = flowSum; } }
4.分区类
package com.css.flow.partition; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class PhoneNumPartitioner extends Partitioner<Text, FlowBean>{ // 根据手机号前三位进行分区 @Override public int getPartition(Text key, FlowBean value, int numPartitions) { // 获取手机号前三位 String phoneNum = key.toString().substring(0, 3); // 分区 int partitioner = 4; if ("135".equals(phoneNum)) { return 0; }else if ("137".equals(phoneNum)) { return 1; }else if ("138".equals(phoneNum)) { return 2; }else if ("139".equals(phoneNum)) { return 3; } return partitioner; } }
5.Driver类
package com.css.flow.partition; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class FlowCountDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 1.获取job信息 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 2.获取jar包 job.setJarByClass(FlowCountDriver.class); // 3.获取自定义的mapper与reducer类 job.setMapperClass(FlowCountMapper.class); job.setReducerClass(FlowCountReducer.class); // 4.设置map输出的数据类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); // 5.设置reduce输出的数据类型(最终的数据类型) job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); // 设置自定义的分区类 // 自定义分区个数要大于分区数 job.setPartitionerClass(PhoneNumPartitioner.class); job.setNumReduceTasks(5); // 6.设置输入存在的路径与处理后的结果路径 FileInputFormat.setInputPaths(job, new Path("c:/flow1020/in")); FileOutputFormat.setOutputPath(job, new Path("c:/flow1020/out")); // 7.提交任务 boolean rs = job.waitForCompletion(true); System.out.println(rs ? 0 : 1); } }
6.输入的文件HTTP_20180313143750.dat
3631279850362 13726130503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 www.itstaredu.com 教育网站 24 27 299 681 200 3631279950322 13822544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 www.taobao.com 淘宝网 4 0 264 0 200 3631279910362 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200 3631244000322 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200 3631279930342 18212575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com 视频网站 15 12 1527 2106 200 3631279950342 13884138413 5C-0E-8B-8C-E8-20:7DaysInn 120.197.40.4 122.72.52.12 20 16 4116 1432 200 3631279930352 13510439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200 3631279950332 15920133257 5C-0E-8B-C7-BA-20:CMCC 120.197.40.4 sug.so.360.cn 信息安全 20 20 316 296 200 3631279830392 13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82 4 0 240 0 200 3631279840312 13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4 s19.cnzz.com 站点统计 24 9 660 690 200 3631279730382 15013685858 5C-0E-8B-C7-F7-90:CMCC 120.197.40.4 rank.ie.sogou.com 搜索引擎 28 27 369 338 200 3631279860392 15889002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99 www.umeng.com 站点统计 3 3 938 380 200 3631279920332 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 15 9 918 4938 200 3631279860312 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 3 3 120 1320 200 3631279840302 13602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4 2052.flash2-http.qq.com 综合门户 15 12 198 910 200 3631279950332 13922314466 00-FD-07-A2-EC-BA:CMCC 120.196.100.82 img.qfc.cn 12 12 3008 3720 200 3631279820302 13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99 y0.ifengimg.com 综合门户 57 102 735 11349 400 3631279860322 18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com 搜索引擎 21 18 9531 212 200 3631279900332 13925057413 00-1F-64-E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com 搜索引擎 69 63 11058 4243 200 3631279880322 13760778710 00-FD-07-A4-7B-08:CMCC 120.196.100.82 2 2 120 120 200 3631279850362 13726238888 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 3631279930352 13560436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1136 94 200 3631279930353 13560436326 C4-17-FE-BA-DE-D9:CMCC 120.196.100.77 lol.qq.com/ 英雄联盟 18 15 1136 94 200
7.输出的文件
(1)part-r-00000 13502468823 735 11349 12084 13510439658 1116 954 2070 13560436326 1136 94 1230 13560436666 1136 94 1230 13560439658 918 4938 5856 (2)part-r-00001 13719199419 240 0 240 13726130503 299 681 980 13726238888 2481 24681 27162 13760778710 120 120 240 (3)part-r-00002 13822544101 264 0 264 13884138413 4116 1432 5548 (4)part-r-00003 13922314466 3008 3720 6728 13925057413 11058 4243 15301 13926251106 240 0 240 13926435656 132 1512 1644 (5)part-r-00004 13480253104 120 1320 1440 13602846565 198 910 1108 13660577991 660 690 1350 15013685858 369 338 707 15889002119 938 380 1318 15920133257 316 296 612 18212575961 1527 2106 3633 18320173382 9531 212 9743
以上是关于MapReduce小文件优化与分区的主要内容,如果未能解决你的问题,请参考以下文章
大数据技术之_05_Hadoop学习_04_MapReduce_Hadoop企业优化(重中之重)+HDFS小文件优化方法+MapReduce扩展案例+倒排索引案例(多job串联)+TopN案例+找博客