映射器输出直接打印到输出文件

Posted

技术标签:

【中文标题】映射器输出直接打印到输出文件【英文标题】:Mapper output printed directly to the output file 【发布时间】:2017-02-09 00:09:40 【问题描述】:

我是 hadoop 新手,正在尝试一些基本的 map reduce 程序。我遇到了一些特殊的问题。我观察到我的映射器输出通过绕过减速器直接打印到输出文件。

代码如下

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;



class WeatherRecord implements Writable 
    public DoubleWritable maxSum; // running sum of TMAX records
    public IntWritable maxCount;  // running count of TMAX records
    public DoubleWritable minSum; // running sum of TMIN records
    public IntWritable minCount; // running count of TMIN records

    // default constructor
    public WeatherRecord()

        maxSum = new DoubleWritable();
        maxCount= new IntWritable();
        minSum = new DoubleWritable();
        minCount = new IntWritable();
    

    // custom constructor
    public WeatherRecord(DoubleWritable ms, IntWritable mc, DoubleWritable ms1, IntWritable mc1)
        maxSum = ms;
        maxCount= mc;
        minSum = ms1;
        minCount = mc1;
    

     /* Getter and setter Methods*/



    //method to get running total of temperature
    public double getMaxSum()
        return Double.parseDouble(maxSum.toString());
    

    //method to get running total of temperature
    public double getMinSum()
        return Double.parseDouble(minSum.toString());
    

    //method to get Count
    public int getMaxCount() return Integer.parseInt(maxCount.toString());

    //method to get Count
    public int getMinCount() return Integer.parseInt(minCount.toString());

    // method to set count
    public void setMaxCount(int c)
        maxCount = new IntWritable(c);
    

    // method to set count
    public void setMinCount(int c)
        minCount = new IntWritable(c);
    

    //method to set reading sum
    public void setMaxSum(double r)
        maxSum = new DoubleWritable(r);
    

    //method to set reading sum
    public void setMinSum(double r)
        minSum = new DoubleWritable(r);
    

    // method to serialize object
    public void write(DataOutput dataOutput) throws IOException 
        maxSum.write(dataOutput);
        maxCount.write(dataOutput);
        minSum.write(dataOutput);
        minCount.write(dataOutput);
    

    //method to deserialize object
    public void readFields(DataInput dataInput) throws IOException 
        maxSum.readFields(dataInput);
        maxCount.readFields(dataInput);
        minSum.readFields(dataInput);
        minCount.readFields(dataInput);
    



public class WeatherDriver extends Configured implements Tool

    public static class WeatherMap extends Mapper<LongWritable, Text, Text,WeatherRecord > 

      HashMap<String,WeatherRecord> recordMap= new HashMap<String,WeatherRecord>();

        protected void map(LongWritable key, Text value, Mapper.Context context) 
            //the individual records from csv file is split based on ','
            String[] record = value.toString().split(",");

            //station-id is the first field in the file
            String stationId = record[0];

            //record-type(TMAX,TMIN,..) is the third field in the csv file
            String type = record[2];

            //temperature readings are fourth column in the csv file
            double temperature = Double.parseDouble(record[3]);


            if(type.equalsIgnoreCase("TMAX") || type.equalsIgnoreCase("TMIN"))

                if(recordMap.containsKey(stationId))
                    WeatherRecord w = recordMap.get(stationId);
                    if(type.equalsIgnoreCase("TMAX"))
                        w.setMaxCount(1 + w.getMaxCount());
                        w.setMaxSum(w.getMaxSum() + temperature);
                    
                    else if(type.equalsIgnoreCase("TMIN"))
                        w.setMinCount(1+w.getMinCount());
                        w.setMinSum(w.getMinSum() + temperature);
                    
                    recordMap.put(stationId,w);
                
                else
                    if(type.equalsIgnoreCase("TMAX"))
                        recordMap.put(stationId, new WeatherRecord(new DoubleWritable(temperature), new IntWritable(1),
                                new DoubleWritable(0), new IntWritable(0)));
                    
                    else if(type.equalsIgnoreCase("TMIN"))
                        recordMap.put(stationId, new WeatherRecord(new DoubleWritable(0), new IntWritable(0),
                                new DoubleWritable(temperature), new IntWritable(1)));
                    

                
            

         // end of map method

        protected void cleanup(Context context) throws IOException, InterruptedException 
            Iterator i =  recordMap.keySet().iterator();
            String stationId="";
            while(i.hasNext())
                stationId = i.next().toString();

                context.write(new Text(stationId),recordMap.get(stationId));
            
         // end of cleanup
     // end of mapper class


    public static class WeatherReduce extends Reducer<Text, WeatherRecord, Text, Text> 

        protected void reduce(Text key, Iterator<WeatherRecord> values, Reducer<Text, WeatherRecord, Text, Text>.Context context) throws IOException, InterruptedException 
            // initializing local variables to compute average
            int maxCount =0;
            int minCount=0;
            double maxSum=0;
            double minSum=0;



            //iterating over list of values to compute average
            while(values.hasNext())
                WeatherRecord record = values.next();

                maxSum += Double.parseDouble(record.maxSum.toString());
                maxCount += Integer.parseInt(record.maxCount.toString());
                minSum += Double.parseDouble(record.minSum.toString());
                minCount+=Integer.parseInt(record.minCount.toString());

            

            // logic to handle divide by zero case

            if(minCount==0)
                minCount=1;
            
            if(maxCount==0)
                maxCount=1;
            

            System.out.println("Min Sum is" + minSum);

            context.write(new Text(key), new Text(","+(minSum/minCount)+","+(maxSum/maxCount)));


        
    

    @Override
    public int run(String[] args) throws Exception 
        Configuration conf = new Configuration();
        args = new GenericOptionsParser(conf, args).getRemainingArgs();
        String input = args[0];
        String output = args[1];

        Job job = new Job(conf, "weather average");
        job.setJarByClass(WeatherMap.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setMapperClass(WeatherMap.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(WeatherRecord.class);

        job.setReducerClass(WeatherReduce.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.setInputPaths(job, new Path(input));
        Path outPath = new Path(output);
        FileOutputFormat.setOutputPath(job, outPath);
        outPath.getFileSystem(conf).delete(outPath, true);

        job.waitForCompletion(true);
        return (job.waitForCompletion(true) ? 0 : 1);
    

    public static void main(String[] args) throws Exception 
        int exitCode = ToolRunner.run(new WeatherDriver(), args);
        System.exit(exitCode);
    

预期的输出是 station_id,average_min_temp,average_max_temp

AGE00135039,123.12,11

但是,我得到了这个输出。通过分析代码,我发现 mapper 的 context.write 是直接写入输出文件

AGE00135039 WeatherRecord@7df8df85
AGE00135039 WeatherRecord@7df8df85
AGE00135039 WeatherRecord@7df8df85
AGE00135039 WeatherRecord@7df8df85
AGE00135039 WeatherRecord@7df8df85
AGE00135039 WeatherRecord@7df8df85
AGE00135039 WeatherRecord@7df8df85
AGE00135039 WeatherRecord@7df8df85
AGE00135039 WeatherRecord@7df8df85
AGE00147704 WeatherRecord@7df8df85
AGE00147704 WeatherRecord@7df8df85
AGE00147704 WeatherRecord@7df8df85
AGE00147704 WeatherRecord@7df8df85
AGE00147704 WeatherRecord@7df8df85
AGE00147704 WeatherRecord@7df8df85

【问题讨论】:

【参考方案1】:

您的工作可能是在 Reducer 类中调用 reduce() 方法,因为您的 reduce() 方法的签名不正确。你有:

protected void reduce(Text key, Iterator&lt;WeatherRecord&gt; values, Context context)

应该是这样的:

protected void reduce(Text key, Iterable&lt;WeatherRecord&gt; values, Context context)

注意从IteratorIterable 的变化。

尝试避免这种情况的一种方法是将 @Override 注释添加到您认为应该覆盖基本实现的方法中,如果您不这样做,它将引发编译时错误。

【讨论】:

以上是关于映射器输出直接打印到输出文件的主要内容,如果未能解决你的问题,请参考以下文章

在映射器的单个输出上运行多个减速器

将文件从 AWS EMR 集群中的映射器上传到 S3

Hadoop MapReduce访问减速器中的映射器输出编号

‘终结点映射器中没有更多的终结点可用’

在可视化映射器中将命名空间添加到根节点

具有单个映射器和两个不同减速器的 hadoop 作业