Hadoop MapReduce:context.write 更改值

Posted

技术标签:

【中文标题】Hadoop MapReduce:context.write 更改值【英文标题】:Hadoop MapReduce: context.write changes values 【发布时间】:2018-03-06 21:19:10 【问题描述】:

我是 Hadoop 新手并正在编写 MapReduce 作业,我遇到了一个问题,即减速器 context.write 方法将正确的值更改为不正确的值。

MapReduce 作业应该做什么?

统计总字数(int wordCount) 统计不同单词的数量(int counter_dist) 统计以“z”或“Z”开头的单词个数(int counter_startZ) 统计出现少于4次的单词个数(int counter_less4)

所有这些都必须在一个 MapReduce 作业中完成。

正在分析的文本文件

Hello how zou zou zou zou how are you

正确的输出:wordCount = 9counter_dist = 5counter_startZ = 4counter_less4 = 4

映射器类

public class WordCountMapper extends Mapper <Object, Text, Text, IntWritable> 

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    @Override
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException 
        StringTokenizer itr = new StringTokenizer(value.toString());
        while (itr.hasMoreTokens()) 
            String hasKey = itr.nextToken();
            word.set(hasKey);
            context.write(word, one);
        

    

Reducer 类 为了调试我的代码,我打印了很多语句来检查我的值。标准输出代码如下。

public class WordCountReducer extends Reducer <Text, IntWritable, Text, IntWritable> 

    int wordCount = 0; // Total number of words
    int counter_dist = 0; // Number of distinct words in the corpus
    int counter_startZ = 0; // Number of words that start with letter Z
    int counter_less4 = 0; // Number of words that appear less than 4 

    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException 
        int repeatedWords = 0;
        System.out.println("###Reduce method starts");
        System.out.println("Values: wordCount:" + wordCount + " counter_dist:" + counter_dist + " counter_startZ:" + counter_startZ + " counter_less4:" + counter_less4 + " (start)");
        for (IntWritable val : values)
            System.out.println("Key: " + key.toString());
            repeatedWords++;
            wordCount += val.get();
            if(key.toString().startsWith("z") || key.toString().startsWith("Z"))
            counter_startZ++;
            
            System.out.println("Values: wordCount:" + wordCount + " counter_dist:" + counter_dist + " counter_startZ:" + counter_startZ + " counter_less4:" + counter_less4 + " (end of loop)");
        
        counter_dist++;

        if(repeatedWords < 4)
            counter_less4++;
        

        System.out.println("Values: wordCount:" + wordCount + " counter_dist:" + counter_dist + " counter_startZ:" + counter_startZ + " counter_less4:" + counter_less4 + " repeatedWords:" + repeatedWords + " (end)");
        System.out.println("###Reduce method ends\n");
    


    @Override
    public void cleanup(Context context) throws IOException, InterruptedException
        System.out.println("###CLEANUP: wordCount: " + wordCount);
        System.out.println("###CLEANUP: counter_dist: " + counter_dist);
        System.out.println("###CLEANUP: counter_startZ: " + counter_startZ);
        System.out.println("###CLEANUP: counter_less4: " + counter_less4);

        context.write(new Text("Total words: "), new IntWritable(wordCount));
        context.write(new Text("Distinct words: "), new IntWritable(counter_dist));
        context.write(new Text("Starts with Z: "), new IntWritable(counter_startZ));
        context.write(new Text("Appears less than 4 times:"), new IntWritable(counter_less4));
    



标准输出日志,我用于调试

###Reduce method starts
Values: wordCount:0 counter_dist:0 counter_startZ:0 counter_less4:0 (start)
Key: Hello
Values: wordCount:1 counter_dist:0 counter_startZ:0 counter_less4:0 (end of loop)
Values: wordCount:1 counter_dist:1 counter_startZ:0 counter_less4:1 repeatedWords:1 (end)
###Reduce method ends

###Reduce method starts
Values: wordCount:1 counter_dist:1 counter_startZ:0 counter_less4:1 (start)
Key: are
Values: wordCount:2 counter_dist:1 counter_startZ:0 counter_less4:1 (end of loop)
Values: wordCount:2 counter_dist:2 counter_startZ:0 counter_less4:2 repeatedWords:1 (end)
###Reduce method ends

###Reduce method starts
Values: wordCount:2 counter_dist:2 counter_startZ:0 counter_less4:2 (start)
Key: how
Values: wordCount:3 counter_dist:2 counter_startZ:0 counter_less4:2 (end of loop)
Key: how
Values: wordCount:4 counter_dist:2 counter_startZ:0 counter_less4:2 (end of loop)
Values: wordCount:4 counter_dist:3 counter_startZ:0 counter_less4:3 repeatedWords:2 (end)
###Reduce method ends

###Reduce method starts
Values: wordCount:4 counter_dist:3 counter_startZ:0 counter_less4:3 (start)
Key: you
Values: wordCount:5 counter_dist:3 counter_startZ:0 counter_less4:3 (end of loop)
Values: wordCount:5 counter_dist:4 counter_startZ:0 counter_less4:4 repeatedWords:1 (end)
###Reduce method ends

###Reduce method starts
Values: wordCount:5 counter_dist:4 counter_startZ:0 counter_less4:4 (start)
Key: zou
Values: wordCount:6 counter_dist:4 counter_startZ:1 counter_less4:4 (end of loop)
Key: zou
Values: wordCount:7 counter_dist:4 counter_startZ:2 counter_less4:4 (end of loop)
Key: zou
Values: wordCount:8 counter_dist:4 counter_startZ:3 counter_less4:4 (end of loop)
Key: zou
Values: wordCount:9 counter_dist:4 counter_startZ:4 counter_less4:4 (end of loop)
Values: wordCount:9 counter_dist:5 counter_startZ:4 counter_less4:4 repeatedWords:4 (end)
###Reduce method ends

###CLEANUP: wordCount: 9
###CLEANUP: counter_dist: 5
###CLEANUP: counter_startZ: 4
###CLEANUP: counter_less4: 4

从日志看来,所有值都是正确的,并且一切正常。但是,当我在 HDFS 中打开输出目录并读取“part-r-00000”文件时,写入那里的 context.write 的输出完全不同。

Total words: 22
Distinct words: 4
Starts with Z: 0
Appears less than 4 times: 4

【问题讨论】:

我已经添加了完整的代码逻辑。希望你能理解。告诉我。 这似乎是一件奇怪的事情,您是否尝试调试您的代码。查看变量! 【参考方案1】:

您绝不能依赖cleanup() 方法来处理关键的程序逻辑。 cleanup() 方法在每次 JVM 被扯掉时被调用。因此,根据生成和杀死的 JVM(您无法预测)的数量,您的逻辑会变得不稳定。

initialization 和写入上下文都移动到reduce 方法中。

int wordCount = 0; // Total number of words
int counter_dist = 0; // Number of distinct words in the corpus
int counter_startZ = 0; // Number of words that start with letter Z
int counter_less4 = 0; // Number of words that appear less than 4 

   context.write(new Text("Total words: "), new IntWritable(wordCount));
    context.write(new Text("Distinct words: "), new IntWritable(counter_dist));
    context.write(new Text("Starts with Z: "), new IntWritable(counter_startZ));
    context.write(new Text("Appears less than 4 times:"), new IntWritable(counter_less4));

编辑:基于 OP cmets,似乎整个逻辑有缺陷。

以下是完成所需结果的代码。 请注意,我没有实现setup()cleanup();因为这根本不需要。

使用计数器来计算您要查找的内容。 MapReduce 完成后,只需获取驱动类中的计数器即可。

例如单词数以“z”或“Z”开头的单词可以在映射器中计数

public class WordCountMapper extends Mapper <Object, Text, Text, IntWritable> 

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    @Override
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException 
        StringTokenizer itr = new StringTokenizer(value.toString());
        while (itr.hasMoreTokens()) 
            String hasKey = itr.nextToken();
            word.set(hasKey);
            context.getCounter("my_counters", "TOTAL_WORDS").increment(1);
            if(hasKey.toUpperCase().startsWith("Z"))
            context.getCounter("my_counters", "Z_WORDS").increment(1);
            
            context.write(word, one);
        
    

不同单词的数量words appearing less than 4 times可以在reducer计数器中统计。

public class WordCountReducer extends Reducer <Text, IntWritable, Text, IntWritable> 

    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException 
        int wordCount= 0;
        context.getCounter("my_counters", "DISTINCT_WORDS").increment(1);
        for (IntWritable val : values)
            wordCount += val.get();
        
        if(wordCount < 4
           context.getCounter("my_counters", "WORDS_LESS_THAN_4").increment(1);
        
    

在 Driver 类中获取计数器。以下代码位于您提交作业的行之后

CounterGroup group = job.getCounters().getGroup("my_counters");

for (Counter counter : group) 
   System.out.println(counter.getName() + "=" + counter.getValue());

【讨论】:

不幸的是,这不是我想要的。我只需要输出中的 4 行具有正确计数的值(问题中的“正确输出”)。此解决方案每次运行 reduce 方法时都会在输出中添加 4 行。 cleanup 方法中编写代码逻辑,您犯了根本性错误。您必须了解setupcleanup 方法的作用,特别是当hadoop 为每个reducer 生成一个新的JVM 时。如果上述修复不起作用,则意味着您的逻辑需要更改。

以上是关于Hadoop MapReduce:context.write 更改值的主要内容,如果未能解决你的问题,请参考以下文章

hadoop MapReduce 读取配置参数

hadoop中setup,cleanup,run和context讲解

mapreduce 怎么查看每个reducer处理的数据量

Hadoop 4Hadoop MapReduce的工作原理

[hadoop]怎么把两个mapreduce工程合起来

hadoop和mapreduce是一种啥关系?