Hadoop 实验MR编程速记

Posted JintuZheng

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hadoop 实验MR编程速记相关的知识,希望对你有一定的参考价值。

Main 统一:

import org.apache.hadoop.conf.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.lib.output.*;

public class main {

	public static void main(String[] args) throws Exception{
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		
		job.setJarByClass(main.class);
		
		job.setMapperClass(myMapper.class);
		job.setReducerClass(myReducer.class);
		job.setCombinerClass(myCombiner.class)
		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		FileInputFormat.setInputPaths(job, new Path("D:/fs_local/in"));
		FileOutputFormat.setOutputPath(job, new Path("D:/fs_local/out"));
		
		job.waitForCompletion(true);
	}

}

1. WordCount

Mapper

package wordCount;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapred.*;

public class myMapper extends Mapper<Object, Text, Text, IntWritable>{
	@Override
	protected void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException{
		String lineContent = value.toString(); // Get line content
		String result[] = lineContent.split(" "); // split line to get word
		//将v1分割成多个键对
		for (int x = 0; x < result.length; x++) {
			context.write(new Text(result[x]), new IntWritable(1));
		}
	}
}

Reducer:

package wordCount;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class myReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
	@Override
	protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException{
		int sum = 0;
		for(IntWritable count : values) {
			sum+=count.get();
		}
		context.write(key, new IntWritable(sum));
	}
}

2. Top N

Mapper

package topN;
import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapred.*;

public class myMapper extends Mapper<Object, Text, Text, IntWritable>{
	@Override
	protected void map(Object key, Text value, Context context) throws IOException, InterruptedException{
		String line = value.toString();
		context.write(new Text("Max"), new IntWritable(Integer.parseInt(line)));
	}
}

Combiner:

package topN;

import java.io.IOException;
import java.util.ArrayList;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapred.*;


public class myCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{
	@Override
	protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{
		ArrayList<Integer> tempA = new ArrayList<Integer>();
		for(IntWritable value : values) {
			tempA.add(value.get());
		}
		tempA.sort(null);
		context.write(new Text("Max"), new IntWritable(tempA.get(tempA.size()-1)));
		context.write(new Text("Max"), new IntWritable(tempA.get(tempA.size()-2)));
		context.write(new Text("Max"), new IntWritable(tempA.get(tempA.size()-3)));
	}
}

Redcuer

package topN;

import java.io.IOException;
import java.util.ArrayList;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapred.*;

public class myReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
	protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{
		ArrayList<Integer> tempA = new ArrayList<Integer>();
		for(IntWritable value : values) {
			tempA.add(value.get());
		}
		tempA.sort(null);
		context.write(new Text("L1Max"), new IntWritable(tempA.get(tempA.size()-1)));
		context.write(new Text("L2Max"), new IntWritable(tempA.get(tempA.size()-2)));
		context.write(new Text("L3Max"), new IntWritable(tempA.get(tempA.size()-3)));
	}

}


3. Trade

Bean

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class TradeBean implements WritableComparable<TradeBean>{
    private String username;
    private int income;
    private int outcome;
    private int profit;
    public TradeBean(){}
    //Getter & Setter
    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public int getIncome() {
        return income;
    }

    public int getOutcome() {
        return outcome;
    }

    public int getProfit() {
        return profit;
    }

    public void setIncome(int income) {
        this.income = income;
    }

    public void setOutcome(int outcome) {
        this.outcome = outcome;
    }

    public void setProfit(int profit) {
        this.profit = profit;
    }
	
    @Override
    public String toString() { 
    //重写toString方法,这是必要的,因为这直接决定TradeBean输出到最终文件的结果
        return username + "\\t" +
                "\\t" + income +
                "\\t" + outcome +
                "\\t" + profit;
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
    //顺序跟定义字段时的顺序保持一致以便序列化
        dataOutput.writeUTF(username);
        dataOutput.writeInt(income);
        dataOutput.writeInt(outcome);
        dataOutput.writeInt(profit);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
     //顺序跟定义的顺序保持一致以便反序列化
        this.username=dataInput.readUTF();
        this.income=dataInput.readInt();
        this.outcome=dataInput.readInt();
        this.profit=dataInput.readInt();
    }

    @Override
    public int compareTo(TradeBean bean) {
        int result = bean.profit-this.profit; //按利润降序
        if(result==0)  //利润相等则按收入降序
            return bean.income-this.income;
        return result;
    }
}

Mapper

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class TradeMapper extends Mapper<LongWritable, Text,Text,TradeBean> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] words = value.toString().split(" ");
        TradeBean bean = new TradeBean(); 
        //只提取收入和支出,忽略掉时间
        int income = Integer.parseInt(words[1]);
        int outcome = Integer.parseInt(words[2]);
        bean.setUsername(words[0]);
        bean.setIncome(income);
        bean.setOutcome(outcome);
        bean.setProfit(income-outcome);
        context.write(new Text(words[0]),bean);
    }
}

Reducer


import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.TreeMap;

public class TradeReducer extends Reducer<Text, TradeBean以上是关于Hadoop 实验MR编程速记的主要内容,如果未能解决你的问题,请参考以下文章

Hadoop 速成大法(理论)

Hadoop 速成大法(理论)

Hadoop MapReduce编程 API入门系列之mr编程快捷键活用技巧详解

hadoop2.5.2学习13-MR之新浪微博-DF的实现

Hadoop详解——Shuffle原理,Partitioner分区原理,Combiner编程,常见的MR算法

mr编程快捷键活用技巧详解