hadoop之mapreduce编程实例(系统日志初步清洗过滤处理)

Posted zfszhangyuan

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了hadoop之mapreduce编程实例(系统日志初步清洗过滤处理)相关的知识,希望对你有一定的参考价值。

刚刚开始接触hadoop的时候,总觉得必须要先安装hadoop集群才能开始学习MR编程,其实并不用这样,当然如果你有条件有机器那最好是自己安装配置一个hadoop集群,这样你会更容易理解其工作原理。我们今天就是要给大家演示如何不用安装hadoop直接调试编程MapReduce函数。

开始之前我们先来理解一下mapreduce的工作原理:

hadoop集群是有DataNode和NameNode两种节点构成,DataNode负责存储数据本身而NameNode负责存储数据的元数据信息,在启动mapreduce任务时,数据首先是通过inputformat模块从集群的文件库中读出,然后按照设定的Splitsize进行Split(默认是一个block大小128MB),通过ReadRecorder(RR)将每个split的数据块按行进行轮询访问结果给到map函数,由map函数按照编程的代码逻辑进行处理,输出key和value。由map到reduce的处理过程中包含三件事情,Combiner(map端的预先处理,相对于map段reduce)Partitioner(负责将map输出数据均衡的分配给reduce)Shulffling&&sort(根据map输出的key进行洗牌和排序,将结果根据partitioner的分配情况传输给指定的reduce),最后reduce按照代码逻辑处理输出结果(也是key,value格式)。

注意:

map阶段的key-value对的格式是由输入的格式所决定的,如果是默认的TextInputFormat,则每行作为一个记录进程处理,其中key为此行的开头相对于文件的起始位置,value就是此行的字符文本
map阶段的输出的key-value对的格式必须同reduce阶段的输入key-value对的格式相对应

下面是wordcount的处理过程大家来理解一下:

现在我们开始我们的本地MR编程吧

首先我们得去官网下载一个hadoop安装包(本文用的hadoop2.6.0版本,不用安装,我们只要包中jars)

下载链接:https://archive.apache.org/dist/hadoop/common/(下载最多的那个就可以了,版本自己选个)

下面就上MR的代码吧:

package loganalysis;
import java.io.IOException;
import java.util.StringTokenizer;
import java.lang.*;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

 
public class WordCount 
 
  public static class TokenizerMapper 
       extends Mapper<Object, Text, Text, IntWritable>
 
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
    private String imei = new String();
    private String areacode  = new String();
    private String responsedata = new String();
    private String requesttime = new String();
    private String requestip = new String();

//    map阶段的key-value对的格式是由输入的格式所决定的,如果是默认的TextInputFormat,则每行作为一个记录进程处理,其中key为此行的开头相对于文件的起始位置,value就是此行的字符文本
//    map阶段的输出的key-value对的格式必须同reduce阶段的输入key-value对的格式相对应
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException 
      //StringTokenizer itr = new StringTokenizer(value.toString());
      
      int areai = value.toString().indexOf("areacode", 21);
      int imeii = value.toString().indexOf("imei", 21);
      int redatai = value.toString().indexOf("responsedata", 21);
      int retimei = value.toString().indexOf("requesttime", 21);
      int reipi = value.toString().indexOf("requestip", 21);
      
      if (areai==-1)
       areacode=""; 
      else
      
      areacode=value.toString().substring(areai+11);
      int len2=areacode.indexOf("\\"");
      if(len2 <= 1)
        
    	  areacode="";
        
      else 
        
    	  areacode=areacode.substring(0,len2);
        
      
      
      if (imeii==-1)
       imei=""; 
      else
      
    	  imei=value.toString().substring(imeii+9);
      int len2=imei.indexOf("\\\\");
      if(len2 <= 1)
        
    	  imei="";
        
      else 
        
    	  imei=imei.substring(0,len2);
        
      
      
     
      
      if (redatai==-1)
       responsedata=""; 
      else
      
    	  responsedata=value.toString().substring(redatai+15);
      int len2=responsedata.indexOf("\\"");
      if(len2 <= 1)
        
    	  responsedata="";
        
      else 
        
    	  responsedata=responsedata.substring(0,len2);
        
      
      
      
      
      if (retimei==-1)
       requesttime=""; 
      else
      
    	  requesttime=value.toString().substring(retimei+14);
      int len2=requesttime.indexOf("\\"");
      if(len2 <= 1)
        
    	  requesttime="";
        
      else 
        
    	  requesttime=requesttime.substring(0,len2);
        
      
      
      
      if (reipi==-1)
       requestip=""; 
      else
      
    	  requestip=value.toString().substring(reipi+12);
      int len2=requestip.indexOf("\\"");
      if(len2 <= 1)
        
    	  requestip="";
        
      else 
        
    	  requestip=requestip.substring(0,len2);
        
      
      
     /* while (itr.hasMoreTokens()) 
    	  string tim;
    	  
        word.set(itr.nextToken());
        context.write(word, one);
      */
     if(imei!=""&&areacode!=""&&responsedata!=""&&requesttime!=""&&requestip!="")
     
       String wd=new String();
       wd=imei+"\\t"+areacode+"\\t"+responsedata+"\\t"+requesttime+"\\t"+requestip;
       //wd="areacode|"+areacode +"|imei|"+ imei +"|responsedata|"+ responsedata +"|requesttime|"+ requesttime +"|requestip|"+ requestip;
       word.set(wd);
       context.write(word, one);
     

    
  
 
  public static class IntSumReducer 
       extends Reducer<Text,IntWritable,Text,IntWritable> 
    private IntWritable result = new IntWritable();
 
    public void reduce(Text key, Iterable<IntWritable> values, 
                       Context context
                       ) throws IOException, InterruptedException 
      int sum = 0;
      for (IntWritable val : values) 
        sum += val.get();
      
      result.set(sum);
      context.write(key, result);
    
  
 
  public static void main(String[] args) throws Exception 
    Configuration conf = new Configuration();
  //  String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    String[] otherArgs=new String[]"/Users/mac/tmp/inputmr","/Users/mac/tmp/output1";
    
    if (otherArgs.length != 2) 
      System.err.println("Usage: wordcount <in> <out>");
      System.exit(2);
    
    
	//Job job = new Job(conf, "word count");
    Job job = Job.getInstance(conf);
    
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  

主要以上除了jdk1.7其他的jar包都来自hadoop安装包中的share文件下下面

如果你不知道那些包需要那就将share\\hadoop\\下面的所以得jar包都添加到项目中


注意:我的电脑是mac pro如果你的是Windows机器相关的路径需要修改一下,前面加上“file:///”( file:///D:\\tmp\\input file:///D:\\tmp\\output)

 String[] otherArgs=new String[]"file:///D:\\tmp\\input","file:///D:\\tmp\\output";
这个程序核心代码都是在map中,主要做了系统日志中相关核心字段的提取并拼接以key形式返回给reduce,value都是设置为1,是为了方便以后的统计。因为是实例所以简单的弄了几个字段,实际可不止这些。

下面给下测试的系统日志:

2016-04-18 16:00:00 "areacode":"浙江省丽水市","countAll":0,"countCorrect":0,"datatime":"4134362","logid":"201604181600001184409476","requestinfo":"\\"sign\\":\\"4\\",\\"timestamp\\":\\"1460966390499\\",\\"remark\\":\\"4\\",\\"subjectPro\\":\\"123456\\",\\"interfaceUserName\\":\\"12345678900987654321\\",\\"channelno\\":\\"100\\",\\"imei\\":\\"12345678900987654321\\",\\"subjectNum\\":\\"13989589062\\",\\"imsi\\":\\"12345678900987654321\\",\\"queryNum\\":\\"13989589062\\"","requestip":"36.16.128.234","requesttime":"2016-04-18 16:00:00","requesttype":"0","responsecode":"010005","responsedata":"无查询结果"
2016-04-18 16:00:00 "areacode":"宁夏银川市","countAll":0,"countCorrect":0,"datatime":"4715990","logid":"201604181600001858043208","requestinfo":"\\"sign\\":\\"4\\",\\"timestamp\\":\\"1460966400120\\",\\"remark\\":\\"4\\",\\"subjectPro\\":\\"123456\\",\\"interfaceUserName\\":\\"12345678900987654321\\",\\"channelno\\":\\"1210\\",\\"imei\\":\\"A0000044ABFD25\\",\\"subjectNum\\":\\"15379681917\\",\\"imsi\\":\\"460036951451601\\",\\"queryNum\\":\\"\\"","requestip":"115.168.93.87","requesttime":"2016-04-18 16:00:00","requesttype":"0","responsecode":"010005","responsedata":"无查询结果","userAgent":"ZTE-Me/Mobile"
2016-04-18 16:00:00 "areacode":"黑龙江省哈尔滨市","countAll":0,"countCorrect":0,"datatime":"5369561","logid":"201604181600001068429609","requestinfo":"\\"interfaceUserName\\":\\"12345678900987654321\\",\\"queryNum\\":\\"\\",\\"timestamp\\":\\"1460966400139\\",\\"sign\\":\\"4\\",\\"imsi\\":\\"460030301212545\\",\\"imei\\":\\"35460207765269\\",\\"subjectNum\\":\\"55588237\\",\\"subjectPro\\":\\"123456\\",\\"remark\\":\\"4\\",\\"channelno\\":\\"2100\\"","requestip":"42.184.41.180","requesttime":"2016-04-18 16:00:00","requesttype":"0","responsecode":"010005","responsedata":"无查询结果"
2016-04-18 16:00:00 "areacode":"浙江省丽水市","countAll":0,"countCorrect":0,"datatime":"4003096","logid":"201604181600001648238807","requestinfo":"\\"sign\\":\\"4\\",\\"timestamp\\":\\"1460966391025\\",\\"remark\\":\\"4\\",\\"subjectPro\\":\\"123456\\",\\"interfaceUserName\\":\\"12345678900987654321\\",\\"channelno\\":\\"100\\",\\"imei\\":\\"12345678900987654321\\",\\"subjectNum\\":\\"13989589062\\",\\"imsi\\":\\"12345678900987654321\\",\\"queryNum\\":\\"13989589062\\"","requestip":"36.16.128.234","requesttime":"2016-04-18 16:00:00","requesttype":"0","responsecode":"010005","responsedata":"无查询结果"
2016-04-18 16:00:00 "areacode":"广西南宁市","countAll":0,"countCorrect":0,"datatime":"4047993","logid":"201604181600001570024205","requestinfo":"\\"sign\\":\\"4\\",\\"timestamp\\":\\"1460966382871\\",\\"remark\\":\\"4\\",\\"subjectPro\\":\\"123456\\",\\"interfaceUserName\\":\\"12345678900987654321\\",\\"channelno\\":\\"1006\\",\\"imei\\":\\"A000004853168C\\",\\"subjectNum\\":\\"07765232589\\",\\"imsi\\":\\"460031210400007\\",\\"queryNum\\":\\"13317810717\\"","requestip":"219.159.72.3","requesttime":"2016-04-18 16:00:00","requesttype":"0","responsecode":"010005","responsedata":"无查询结果"
2016-04-18 16:00:00 "areacode":"海南省五指山市","countAll":0,"countCorrect":0,"datatime":"5164117","logid":"201604181600001227842048","requestinfo":"\\"sign\\":\\"4\\",\\"timestamp\\":\\"1460966399159\\",\\"remark\\":\\"4\\",\\"subjectPro\\":\\"123456\\",\\"interfaceUserName\\":\\"12345678900987654321\\",\\"channelno\\":\\"1017\\",\\"imei\\":\\"A000005543AFB7\\",\\"subjectNum\\":\\"089836329061\\",\\"imsi\\":\\"460036380954376\\",\\"queryNum\\":\\"13389875751\\"","requestip":"140.240.171.71","requesttime":"2016-04-18 16:00:00","requesttype":"0","responsecode":"010005","responsedata":"无查询结果"
2016-04-18 16:00:00 "areacode":"山西省","countAll":0,"countCorrect":0,"datatime":"14075772","logid":"201604181600001284030648","requestinfo":"\\"sign\\":\\"4\\",\\"timestamp\\":\\"1460966400332\\",\\"remark\\":\\"4\\",\\"subjectPro\\":\\"123456\\",\\"interfaceUserName\\":\\"12345678900987654321\\",\\"channelno\\":\\"1006\\",\\"imei\\":\\"A000004FE0218A\\",\\"subjectNum\\":\\"03514043633\\",\\"imsi\\":\\"460037471517070\\",\\"queryNum\\":\\"\\"","requestip":"1.68.5.227","requesttime":"2016-04-18 16:00:00","requesttype":"0","responsecode":"010005","responsedata":"无查询结果"
2016-04-18 16:00:00 "areacode":"四川省","countAll":0,"countCorrect":0,"datatime":"6270982","logid":"201604181600001173504863","requestinfo":"\\"sign\\":\\"4\\",\\"timestamp\\":\\"1460966398896\\",\\"remark\\":\\"4\\",\\"subjectPro\\":\\"123456\\",\\"interfaceUserName\\":\\"12345678900987654321\\",\\"channelno\\":\\"100\\",\\"imei\\":\\"12345678900987654321\\",\\"subjectNum\\":\\"13666231300\\",\\"imsi\\":\\"12345678900987654321\\",\\"queryNum\\":\\"13666231300\\"","requestip":"182.144.66.97","requesttime":"2016-04-18 16:00:00","requesttype":"0","responsecode":"010005","responsedata":"无查询结果"
2016-04-18 16:00:00 "areacode":"浙江省","countAll":0,"countCorrect":0,"datatime":"4198522","logid":"201604181600001390637240","requestinfo":"\\"sign\\":\\"4\\",\\"timestamp\\":\\"1460966399464\\",\\"remark\\":\\"4\\",\\"subjectPro\\":\\"123456\\",\\"interfaceUserName\\":\\"12345678900987654321\\",\\"channelno\\":\\"100\\",\\"imei\\":\\"12345678900987654321\\",\\"subjectNum\\":\\"05533876327\\",\\"imsi\\":\\"12345678900987654321\\",\\"queryNum\\":\\"05533876327\\"","requestip":"36.23.9.49","requesttime":"2016-04-18 16:00:00","requesttype":"0","responsecode":"000000","responsedata":"操作成功"
2016-04-18 16:00:00 "areacode":"江苏省连云港市","countAll":0,"countCorrect":0,"datatime":"4408097","logid":"201604181600001249944032","requestinfo":"\\"sign\\":\\"4\\",\\"timestamp\\":\\"1460966395908\\",\\"remark\\":\\"4\\",\\"subjectPro\\":\\"123456\\",\\"interfaceUserName\\":\\"12345678900987654321\\",\\"channelno\\":\\"100\\",\\"imei\\":\\"12345678900987654321\\",\\"subjectNum\\":\\"18361451463\\",\\"imsi\\":\\"12345678900987654321\\",\\"queryNum\\":\\"18361451463\\"","requestip":"58.223.4.210","requesttime":"2016-04-18 16:00:00","requesttype":"0","responsecode":"010005","responsedata":"无查询结果"

最后给出运行结果截图:

以上是关于hadoop之mapreduce编程实例(系统日志初步清洗过滤处理)的主要内容,如果未能解决你的问题,请参考以下文章

Hadoop MapReduce编程 API入门系列之网页流量版本1(二十二)

Hadoop MapReduce编程 API入门系列之网页流量版本1(二十一)

MapReduce编程实例5

Hadoop 执行引擎之 MapReduce

大数据之Hadoop(MapReduce):MapReduce编程规范

大数据之HBase MapReduce的实例分析