Hadoop 实验MR编程速记
Posted JintuZheng
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hadoop 实验MR编程速记相关的知识,希望对你有一定的参考价值。
Main 统一:
import org.apache.hadoop.conf.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.lib.output.*;
public class main {
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(main.class);
job.setMapperClass(myMapper.class);
job.setReducerClass(myReducer.class);
job.setCombinerClass(myCombiner.class)
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path("D:/fs_local/in"));
FileOutputFormat.setOutputPath(job, new Path("D:/fs_local/out"));
job.waitForCompletion(true);
}
}
1. WordCount
Mapper:
package wordCount;
import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapred.*;
public class myMapper extends Mapper<Object, Text, Text, IntWritable>{
@Override
protected void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException{
String lineContent = value.toString(); // Get line content
String result[] = lineContent.split(" "); // split line to get word
//将v1分割成多个键对
for (int x = 0; x < result.length; x++) {
context.write(new Text(result[x]), new IntWritable(1));
}
}
}
Reducer:
package wordCount;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class myReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException{
int sum = 0;
for(IntWritable count : values) {
sum+=count.get();
}
context.write(key, new IntWritable(sum));
}
}
2. Top N
Mapper
package topN;
import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapred.*;
public class myMapper extends Mapper<Object, Text, Text, IntWritable>{
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException{
String line = value.toString();
context.write(new Text("Max"), new IntWritable(Integer.parseInt(line)));
}
}
Combiner:
package topN;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapred.*;
public class myCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{
ArrayList<Integer> tempA = new ArrayList<Integer>();
for(IntWritable value : values) {
tempA.add(value.get());
}
tempA.sort(null);
context.write(new Text("Max"), new IntWritable(tempA.get(tempA.size()-1)));
context.write(new Text("Max"), new IntWritable(tempA.get(tempA.size()-2)));
context.write(new Text("Max"), new IntWritable(tempA.get(tempA.size()-3)));
}
}
Redcuer
package topN;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapred.*;
public class myReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{
ArrayList<Integer> tempA = new ArrayList<Integer>();
for(IntWritable value : values) {
tempA.add(value.get());
}
tempA.sort(null);
context.write(new Text("L1Max"), new IntWritable(tempA.get(tempA.size()-1)));
context.write(new Text("L2Max"), new IntWritable(tempA.get(tempA.size()-2)));
context.write(new Text("L3Max"), new IntWritable(tempA.get(tempA.size()-3)));
}
}
3. Trade
Bean
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class TradeBean implements WritableComparable<TradeBean>{
private String username;
private int income;
private int outcome;
private int profit;
public TradeBean(){}
//Getter & Setter
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public int getIncome() {
return income;
}
public int getOutcome() {
return outcome;
}
public int getProfit() {
return profit;
}
public void setIncome(int income) {
this.income = income;
}
public void setOutcome(int outcome) {
this.outcome = outcome;
}
public void setProfit(int profit) {
this.profit = profit;
}
@Override
public String toString() {
//重写toString方法,这是必要的,因为这直接决定TradeBean输出到最终文件的结果
return username + "\\t" +
"\\t" + income +
"\\t" + outcome +
"\\t" + profit;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
//顺序跟定义字段时的顺序保持一致以便序列化
dataOutput.writeUTF(username);
dataOutput.writeInt(income);
dataOutput.writeInt(outcome);
dataOutput.writeInt(profit);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
//顺序跟定义的顺序保持一致以便反序列化
this.username=dataInput.readUTF();
this.income=dataInput.readInt();
this.outcome=dataInput.readInt();
this.profit=dataInput.readInt();
}
@Override
public int compareTo(TradeBean bean) {
int result = bean.profit-this.profit; //按利润降序
if(result==0) //利润相等则按收入降序
return bean.income-this.income;
return result;
}
}
Mapper
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class TradeMapper extends Mapper<LongWritable, Text,Text,TradeBean> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] words = value.toString().split(" ");
TradeBean bean = new TradeBean();
//只提取收入和支出,忽略掉时间
int income = Integer.parseInt(words[1]);
int outcome = Integer.parseInt(words[2]);
bean.setUsername(words[0]);
bean.setIncome(income);
bean.setOutcome(outcome);
bean.setProfit(income-outcome);
context.write(new Text(words[0]),bean);
}
}
Reducer
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.TreeMap;
public class TradeReducer extends Reducer<Text, TradeBean以上是关于Hadoop 实验MR编程速记的主要内容,如果未能解决你的问题,请参考以下文章
Hadoop MapReduce编程 API入门系列之mr编程快捷键活用技巧详解