MR 数据过滤
Posted Shaw_喆宇
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MR 数据过滤相关的知识,希望对你有一定的参考价值。
MR:
package com.euphe.filter; import com.euphe.util.HUtils; import com.euphe.util.Utils; import com.euphe.util.standardUtil.Location; import com.euphe.util.standardUtil.StringListTools; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import java.io.IOException; import java.util.ArrayList; import java.util.List; import static com.euphe.util.standardUtil.Shufflter.shufflter; public class FilterJob extends Configured implements Tool { public static class Map extends Mapper<Object, Text, Text, Text> { private static Text text = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); List<String> FirstList = new ArrayList<String>(); FirstList = StringListTools.StringToList(line,"\t"); String time = FirstList.get(Location.DATE_TIME); context.write(new Text(time), new Text(line)); } } public static class Reduce extends Reducer<Text, Text, NullWritable, Text> { public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { try{ String line = ""; String result = ""; boolean fflg = false; List<String> resultList = new ArrayList<String>(); for(Text value : values){ line = value.toString(); fflg = shufflter(line); if(fflg){ resultList.add(line); } } result = StringListTools.ListToString(resultList, "\n"); context.write(NullWritable.get(), new Text(result)); }catch (Exception e){ e.printStackTrace(); } } } @Override public int run(String[] args) throws Exception { Configuration conf = HUtils.getConf(); conf.set("mapreduce.job.jar", Utils.getRootPathBasedPath("WEB-INF/jars/filter.jar")); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();//解析命令行参数 if (otherArgs.length !=2) {//要求必须有输入和输出路径两个参数 System.err.println("Usage: com.euphe.filter.FilterJob <in> <out>"); System.exit(2); } Job job = Job.getInstance(conf,"Filter input :"+otherArgs[0]+" to "+otherArgs[1]); job.setJarByClass(FilterJob.class); job.setMapperClass(FilterJob.Map.class); job.setReducerClass(FilterJob.Reduce.class); job.setNumReduceTasks(1); //设置map输出的key value job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //设置reducer输出的key,value类型 job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job,new Path(otherArgs[1])); FileSystem.get(conf).delete(new Path(otherArgs[1]), true);//调用任务前先删除输出目录 return job.waitForCompletion(true) ? 0 : 1; } }
shufflter函数:
package com.euphe.util.standardUtil; import java.util.ArrayList; import java.util.List; public class Shufflter { public static boolean shufflter(String line){ boolean fflg = true; List<String> tmpList = new ArrayList<String>(); tmpList = StringListTools.StringToList(line, "\t"); try{ String dk1 = tmpList.get(Location.DK1); String of = tmpList.get(Location.osFamily); String uf = tmpList.get(Location.uaFamily); String ty = tmpList.get(Location.type); if(dk1.equals("null") && of.equals("unknown") && uf.equals("unknown") && ty.equals("unknown")) fflg = false; }catch (Exception e){ e.printStackTrace(); } return fflg; } }
Location工具:
package com.euphe.util.standardUtil; public class Location { //原始数据中各属性的位置 public static final int DATE_TIME = 0; //shufflter阶段各属性位置 public static final int DK1 = 5; public static final int osFamily = 8; public static final int uaFamily = 9; public static final int type = 13; }
StringList工具:
package com.euphe.util.standardUtil; import java.util.ArrayList; import java.util.List; public class StringListTools { public static List<String> StringToList(String str, String seperator){ //该函数读入日志文件的一行,根据分隔符将各个项保存到List中 if(str == null) return null; List<String> strList = new ArrayList<String>(); String[] strArray = str.split(seperator); for(String text : strArray) strList.add(text); return strList; } public static String ListToString(List<String> tempList, String seperator) { //该函数根据分隔符将List保存为String if (tempList == null) return null; String temp = new String(); for(int i = 0; i < tempList.size()-1; i++){ temp = temp + tempList.get(i) + seperator; } temp = temp + tempList.get(tempList.size()-1); return temp; } }
以上是关于MR 数据过滤的主要内容,如果未能解决你的问题,请参考以下文章