映射器输出直接打印到输出文件
Posted
技术标签:
【中文标题】映射器输出直接打印到输出文件【英文标题】:Mapper output printed directly to the output file 【发布时间】:2017-02-09 00:09:40 【问题描述】:我是 hadoop 新手,正在尝试一些基本的 map reduce 程序。我遇到了一些特殊的问题。我观察到我的映射器输出通过绕过减速器直接打印到输出文件。
代码如下
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
class WeatherRecord implements Writable
public DoubleWritable maxSum; // running sum of TMAX records
public IntWritable maxCount; // running count of TMAX records
public DoubleWritable minSum; // running sum of TMIN records
public IntWritable minCount; // running count of TMIN records
// default constructor
public WeatherRecord()
maxSum = new DoubleWritable();
maxCount= new IntWritable();
minSum = new DoubleWritable();
minCount = new IntWritable();
// custom constructor
public WeatherRecord(DoubleWritable ms, IntWritable mc, DoubleWritable ms1, IntWritable mc1)
maxSum = ms;
maxCount= mc;
minSum = ms1;
minCount = mc1;
/* Getter and setter Methods*/
//method to get running total of temperature
public double getMaxSum()
return Double.parseDouble(maxSum.toString());
//method to get running total of temperature
public double getMinSum()
return Double.parseDouble(minSum.toString());
//method to get Count
public int getMaxCount() return Integer.parseInt(maxCount.toString());
//method to get Count
public int getMinCount() return Integer.parseInt(minCount.toString());
// method to set count
public void setMaxCount(int c)
maxCount = new IntWritable(c);
// method to set count
public void setMinCount(int c)
minCount = new IntWritable(c);
//method to set reading sum
public void setMaxSum(double r)
maxSum = new DoubleWritable(r);
//method to set reading sum
public void setMinSum(double r)
minSum = new DoubleWritable(r);
// method to serialize object
public void write(DataOutput dataOutput) throws IOException
maxSum.write(dataOutput);
maxCount.write(dataOutput);
minSum.write(dataOutput);
minCount.write(dataOutput);
//method to deserialize object
public void readFields(DataInput dataInput) throws IOException
maxSum.readFields(dataInput);
maxCount.readFields(dataInput);
minSum.readFields(dataInput);
minCount.readFields(dataInput);
public class WeatherDriver extends Configured implements Tool
public static class WeatherMap extends Mapper<LongWritable, Text, Text,WeatherRecord >
HashMap<String,WeatherRecord> recordMap= new HashMap<String,WeatherRecord>();
protected void map(LongWritable key, Text value, Mapper.Context context)
//the individual records from csv file is split based on ','
String[] record = value.toString().split(",");
//station-id is the first field in the file
String stationId = record[0];
//record-type(TMAX,TMIN,..) is the third field in the csv file
String type = record[2];
//temperature readings are fourth column in the csv file
double temperature = Double.parseDouble(record[3]);
if(type.equalsIgnoreCase("TMAX") || type.equalsIgnoreCase("TMIN"))
if(recordMap.containsKey(stationId))
WeatherRecord w = recordMap.get(stationId);
if(type.equalsIgnoreCase("TMAX"))
w.setMaxCount(1 + w.getMaxCount());
w.setMaxSum(w.getMaxSum() + temperature);
else if(type.equalsIgnoreCase("TMIN"))
w.setMinCount(1+w.getMinCount());
w.setMinSum(w.getMinSum() + temperature);
recordMap.put(stationId,w);
else
if(type.equalsIgnoreCase("TMAX"))
recordMap.put(stationId, new WeatherRecord(new DoubleWritable(temperature), new IntWritable(1),
new DoubleWritable(0), new IntWritable(0)));
else if(type.equalsIgnoreCase("TMIN"))
recordMap.put(stationId, new WeatherRecord(new DoubleWritable(0), new IntWritable(0),
new DoubleWritable(temperature), new IntWritable(1)));
// end of map method
protected void cleanup(Context context) throws IOException, InterruptedException
Iterator i = recordMap.keySet().iterator();
String stationId="";
while(i.hasNext())
stationId = i.next().toString();
context.write(new Text(stationId),recordMap.get(stationId));
// end of cleanup
// end of mapper class
public static class WeatherReduce extends Reducer<Text, WeatherRecord, Text, Text>
protected void reduce(Text key, Iterator<WeatherRecord> values, Reducer<Text, WeatherRecord, Text, Text>.Context context) throws IOException, InterruptedException
// initializing local variables to compute average
int maxCount =0;
int minCount=0;
double maxSum=0;
double minSum=0;
//iterating over list of values to compute average
while(values.hasNext())
WeatherRecord record = values.next();
maxSum += Double.parseDouble(record.maxSum.toString());
maxCount += Integer.parseInt(record.maxCount.toString());
minSum += Double.parseDouble(record.minSum.toString());
minCount+=Integer.parseInt(record.minCount.toString());
// logic to handle divide by zero case
if(minCount==0)
minCount=1;
if(maxCount==0)
maxCount=1;
System.out.println("Min Sum is" + minSum);
context.write(new Text(key), new Text(","+(minSum/minCount)+","+(maxSum/maxCount)));
@Override
public int run(String[] args) throws Exception
Configuration conf = new Configuration();
args = new GenericOptionsParser(conf, args).getRemainingArgs();
String input = args[0];
String output = args[1];
Job job = new Job(conf, "weather average");
job.setJarByClass(WeatherMap.class);
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(WeatherMap.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(WeatherRecord.class);
job.setReducerClass(WeatherReduce.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, new Path(input));
Path outPath = new Path(output);
FileOutputFormat.setOutputPath(job, outPath);
outPath.getFileSystem(conf).delete(outPath, true);
job.waitForCompletion(true);
return (job.waitForCompletion(true) ? 0 : 1);
public static void main(String[] args) throws Exception
int exitCode = ToolRunner.run(new WeatherDriver(), args);
System.exit(exitCode);
预期的输出是 station_id,average_min_temp,average_max_temp
AGE00135039,123.12,11
但是,我得到了这个输出。通过分析代码,我发现 mapper 的 context.write 是直接写入输出文件
AGE00135039 WeatherRecord@7df8df85
AGE00135039 WeatherRecord@7df8df85
AGE00135039 WeatherRecord@7df8df85
AGE00135039 WeatherRecord@7df8df85
AGE00135039 WeatherRecord@7df8df85
AGE00135039 WeatherRecord@7df8df85
AGE00135039 WeatherRecord@7df8df85
AGE00135039 WeatherRecord@7df8df85
AGE00135039 WeatherRecord@7df8df85
AGE00147704 WeatherRecord@7df8df85
AGE00147704 WeatherRecord@7df8df85
AGE00147704 WeatherRecord@7df8df85
AGE00147704 WeatherRecord@7df8df85
AGE00147704 WeatherRecord@7df8df85
AGE00147704 WeatherRecord@7df8df85
【问题讨论】:
【参考方案1】:您的工作可能是在 Reducer 类中调用 reduce()
方法,因为您的 reduce()
方法的签名不正确。你有:
protected void reduce(Text key, Iterator<WeatherRecord> values, Context context)
应该是这样的:
protected void reduce(Text key, Iterable<WeatherRecord> values, Context context)
注意从Iterator
到Iterable
的变化。
尝试避免这种情况的一种方法是将 @Override
注释添加到您认为应该覆盖基本实现的方法中,如果您不这样做,它将引发编译时错误。
【讨论】:
以上是关于映射器输出直接打印到输出文件的主要内容,如果未能解决你的问题,请参考以下文章