mapreduce 中的可写类

Posted

技术标签:

【中文标题】mapreduce 中的可写类【英文标题】:Writable Classes in mapreduce 【发布时间】:2021-02-21 22:00:14 【问题描述】:

如何使用从 hashset(docid 和 offset)到 reduce writable 的值,以便将 map writable 与 reduce writable 连接起来? 映射器(LineIndexMapper)工作正常,但在减速器(LineIndexReducer)中,当我键入以下内容时,我收到错误消息:它无法将字符串作为参数: context.write(key, new IndexRecordWritable("some string"); 虽然我在 ReduceWritable 中也有公共字符串 toString()。 我相信reducer的可写(IndexRecordWritable.java)中的哈希集可能没有正确获取值? 我有以下代码。

IndexMapRecordWritable.java
    
    

    
        import java.io.DataInput;
        import java.io.DataOutput;
        import java.io.IOException;
        import org.apache.hadoop.io.LongWritable;
        import org.apache.hadoop.io.Text;
        import org.apache.hadoop.io.Writable;
    
        public class IndexMapRecordWritable implements Writable 
    
            private LongWritable offset;
            private Text docid;
    
            public LongWritable getOffsetWritable() 
                return offset;
            
    
            public Text getDocidWritable() 
                return docid;
            
    
            public long getOffset() 
                return offset.get();
            
    
            public String getDocid() 
                return docid.toString();
            
    
            public IndexMapRecordWritable() 
                this.offset = new LongWritable();
                this.docid = new Text();
            
          
            public IndexMapRecordWritable(long offset, String docid) 
                this.offset = new LongWritable(offset);
                this.docid = new Text(docid);
            
            public IndexMapRecordWritable(IndexMapRecordWritable indexMapRecordWritable) 
                this.offset = indexMapRecordWritable.getOffsetWritable();
                this.docid = indexMapRecordWritable.getDocidWritable();
            
            @Override
            public String toString() 
    
                StringBuilder output = new StringBuilder()
                output.append(docid);
                output.append(offset);
                
                return output.toString();
    
            
    
            @Override
            public void write(DataOutput out) throws IOException 
 

            
    
            @Override
            public void readFields(DataInput in) throws IOException 


            
    
        
    
    
    
IndexRecordWritable.java
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    import java.util.HashSet;
    import org.apache.hadoop.io.Writable;
    
    public class IndexRecordWritable implements Writable 
    
        // Save each index record from maps
        private HashSet<IndexMapRecordWritable> tokens = new HashSet<IndexMapRecordWritable>();
    
        public IndexRecordWritable() 
        
    
        public IndexRecordWritable(
                Iterable<IndexMapRecordWritable> indexMapRecordWritables) 
  
        
    
        @Override
        public String toString() 
    
            StringBuilder output = new StringBuilder();

            return output.toString();
    
        
    
        @Override
        public void write(DataOutput out) throws IOException 

        
   
        @Override
        public void readFields(DataInput in) throws IOException 

        
    
    
    

【问题讨论】:

你的代码中哪里有 context.write?请将错误消息发布为屏幕截图 从外观上看,您似乎已使用 job.setOutputKeyClass(Text.class); 在驱动程序类中将输出类设置为文本。所以,在你的 reducer 类中,类型基本上是 'extend Reducer' 啊对..你能把你的mapper和reducer也贴出来吗? 看起来问题确实出在 IndexRecordWritable 中。你能告诉我减速器的输出值是多少吗?如果可以的话给我一个例子.. 检查这个.. 在您的 context.write 中,您将值作为 IndexRecordWritable 的对象传递,并带有 string。但是,您的 IndexRecordWritable 构造函数不接受字符串,而是需要一个 iterable 对象。你能告诉我在 IndexRecordWritable 的构造函数中发生了什么吗?这 - public IndexRecordWritable(Iterable indexMapRecordWritables) /***/ /***/会发生什么? 【参考方案1】:

好的,这是我基于一些假设的答案。最终输出是一个文本文件,其中包含键和文件名,由逗号分隔,基于 reducer 类的 cmet 中关于前置条件和后置条件的信息。

在这种情况下,你真的不需要 IndexRecordWritable 类。您可以使用

简单地写入您的上下文
context.write(key, new Text(valueBuilder.substring(0, valueBuilder.length() - 1))); 

类声明行为

public class LineIndexReducer extends Reducer<Text, IndexMapRecordWritable, Text, Text>

不要忘记在驱动程序中设置正确的输出类。

这必须根据减速器类中的后置条件来达到目的。但是,如果你真的想为你的上下文编写一个 Text-IndexRecordWritable 对,有两种方法可以接近它 -

    使用字符串作为参数(基于您在 IndexRecordWritable 类构造函数不设计为接受字符串时尝试传递字符串)和 以 HashSet 作为参数(基于在 IndexRecordWritable 类中初始化的 HashSet)。

由于您的 IndexRecordWritable 类的构造函数并非设计为接受字符串作为输入,因此您不能传递字符串。因此,您得到的错误是您不能使用字符串作为参数。 Ps:如果你想让你的构造函数接受字符串,你的 IndexRecordWritable 类中必须有另一个构造函数,如下所示:

// Save each index record from maps
    private HashSet<IndexMapRecordWritable> tokens = new HashSet<IndexMapRecordWritable>();
    
    // to save the string
    private String value;

    public IndexRecordWritable() 
    

    public IndexRecordWritable(
            HashSet<IndexMapRecordWritable> indexMapRecordWritables) 
        /***/
    

    // to accpet string
    public IndexRecordWritable (String value)   
        this.value = value;
    

但如果你想使用 HashSet,那将是无效的。因此,方法#1 不能使用。不能传递字符串。

这给我们留下了方法#2。传递 HashSet 作为参数,因为您想使用 HashSet。在这种情况下,您必须在 reducer 中创建一个 HashSet,然后再将其作为参数传递给 context.write 中的 IndexRecordWritable。

为此,您的减速器必须如下所示。

@Override
    protected void reduce(Text key, Iterable<IndexMapRecordWritable> values, Context context) throws IOException, InterruptedException 
        //StringBuilder valueBuilder = new StringBuilder();

        HashSet<IndexMapRecordWritable> set = new HashSet<>();

        for (IndexMapRecordWritable val : values) 
            set.add(val);
            //valueBuilder.append(val);
            //valueBuilder.append(",");
        

        //write the key and the adjusted value (removing the last comma)
        //context.write(key, new IndexRecordWritable(valueBuilder.substring(0, valueBuilder.length() - 1)));
        context.write(key, new IndexRecordWritable(set));
        //valueBuilder.setLength(0);
    

你的 IndexRecordWritable.java 必须有这个。

// Save each index record from maps
    private HashSet<IndexMapRecordWritable> tokens = new HashSet<IndexMapRecordWritable>();

// to save the string
//private String value;

public IndexRecordWritable() 


public IndexRecordWritable(
        HashSet<IndexMapRecordWritable> indexMapRecordWritables) 
    /***/
    tokens.addAll(indexMapRecordWritables);

请记住,根据减速器的描述,这不是要求。

POST-CONDITION: emit the output a single key-value where all the file names are separated by a comma ",".  <"marcello", "a.txt@3345,b.txt@344,c.txt@785">

如果你仍然选择发出 (Text, IndexRecordWritable),记得在 IndexRecordWritable 中处理 HashSet 以获得所需的格式。

【讨论】:

以上是关于mapreduce 中的可写类的主要内容,如果未能解决你的问题,请参考以下文章

[MapReduce_5] MapReduce 中的 Combiner 组件应用

MapReduce 示例:减少 Hadoop MapReduce 中的侧连接

MapReduce中的shuffle过程

MapReduce在MES中的应用

mapreduce中的combinerpartitionerShuffle

[MapReduce_8] MapReduce 中的自定义分区实现