mapreduce 中的可写类
Posted
技术标签:
【中文标题】mapreduce 中的可写类【英文标题】:Writable Classes in mapreduce 【发布时间】:2021-02-21 22:00:14 【问题描述】:如何使用从 hashset(docid 和 offset)到 reduce writable 的值,以便将 map writable 与 reduce writable 连接起来? 映射器(LineIndexMapper)工作正常,但在减速器(LineIndexReducer)中,当我键入以下内容时,我收到错误消息:它无法将字符串作为参数: context.write(key, new IndexRecordWritable("some string"); 虽然我在 ReduceWritable 中也有公共字符串 toString()。 我相信reducer的可写(IndexRecordWritable.java)中的哈希集可能没有正确获取值? 我有以下代码。
IndexMapRecordWritable.java
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
public class IndexMapRecordWritable implements Writable
private LongWritable offset;
private Text docid;
public LongWritable getOffsetWritable()
return offset;
public Text getDocidWritable()
return docid;
public long getOffset()
return offset.get();
public String getDocid()
return docid.toString();
public IndexMapRecordWritable()
this.offset = new LongWritable();
this.docid = new Text();
public IndexMapRecordWritable(long offset, String docid)
this.offset = new LongWritable(offset);
this.docid = new Text(docid);
public IndexMapRecordWritable(IndexMapRecordWritable indexMapRecordWritable)
this.offset = indexMapRecordWritable.getOffsetWritable();
this.docid = indexMapRecordWritable.getDocidWritable();
@Override
public String toString()
StringBuilder output = new StringBuilder()
output.append(docid);
output.append(offset);
return output.toString();
@Override
public void write(DataOutput out) throws IOException
@Override
public void readFields(DataInput in) throws IOException
IndexRecordWritable.java
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashSet;
import org.apache.hadoop.io.Writable;
public class IndexRecordWritable implements Writable
// Save each index record from maps
private HashSet<IndexMapRecordWritable> tokens = new HashSet<IndexMapRecordWritable>();
public IndexRecordWritable()
public IndexRecordWritable(
Iterable<IndexMapRecordWritable> indexMapRecordWritables)
@Override
public String toString()
StringBuilder output = new StringBuilder();
return output.toString();
@Override
public void write(DataOutput out) throws IOException
@Override
public void readFields(DataInput in) throws IOException
【问题讨论】:
你的代码中哪里有 context.write?请将错误消息发布为屏幕截图 从外观上看,您似乎已使用 job.setOutputKeyClass(Text.class); 在驱动程序类中将输出类设置为文本。所以,在你的 reducer 类中,类型基本上是 'extend Reducer' 啊对..你能把你的mapper和reducer也贴出来吗? 看起来问题确实出在 IndexRecordWritable 中。你能告诉我减速器的输出值是多少吗?如果可以的话给我一个例子.. 检查这个.. 在您的 context.write 中,您将值作为 IndexRecordWritable 的对象传递,并带有 string。但是,您的 IndexRecordWritable 构造函数不接受字符串,而是需要一个 iterable 对象。你能告诉我在 IndexRecordWritable 的构造函数中发生了什么吗?这 - public IndexRecordWritable(Iterable好的,这是我基于一些假设的答案。最终输出是一个文本文件,其中包含键和文件名,由逗号分隔,基于 reducer 类的 cmet 中关于前置条件和后置条件的信息。
在这种情况下,你真的不需要 IndexRecordWritable 类。您可以使用
简单地写入您的上下文context.write(key, new Text(valueBuilder.substring(0, valueBuilder.length() - 1)));
类声明行为
public class LineIndexReducer extends Reducer<Text, IndexMapRecordWritable, Text, Text>
不要忘记在驱动程序中设置正确的输出类。
这必须根据减速器类中的后置条件来达到目的。但是,如果你真的想为你的上下文编写一个 Text-IndexRecordWritable 对,有两种方法可以接近它 -
-
使用字符串作为参数(基于您在 IndexRecordWritable 类构造函数不设计为接受字符串时尝试传递字符串)和
以 HashSet 作为参数(基于在 IndexRecordWritable 类中初始化的 HashSet)。
由于您的 IndexRecordWritable 类的构造函数并非设计为接受字符串作为输入,因此您不能传递字符串。因此,您得到的错误是您不能使用字符串作为参数。 Ps:如果你想让你的构造函数接受字符串,你的 IndexRecordWritable 类中必须有另一个构造函数,如下所示:
// Save each index record from maps
private HashSet<IndexMapRecordWritable> tokens = new HashSet<IndexMapRecordWritable>();
// to save the string
private String value;
public IndexRecordWritable()
public IndexRecordWritable(
HashSet<IndexMapRecordWritable> indexMapRecordWritables)
/***/
// to accpet string
public IndexRecordWritable (String value)
this.value = value;
但如果你想使用 HashSet,那将是无效的。因此,方法#1 不能使用。不能传递字符串。
这给我们留下了方法#2。传递 HashSet 作为参数,因为您想使用 HashSet。在这种情况下,您必须在 reducer 中创建一个 HashSet,然后再将其作为参数传递给 context.write 中的 IndexRecordWritable。
为此,您的减速器必须如下所示。
@Override
protected void reduce(Text key, Iterable<IndexMapRecordWritable> values, Context context) throws IOException, InterruptedException
//StringBuilder valueBuilder = new StringBuilder();
HashSet<IndexMapRecordWritable> set = new HashSet<>();
for (IndexMapRecordWritable val : values)
set.add(val);
//valueBuilder.append(val);
//valueBuilder.append(",");
//write the key and the adjusted value (removing the last comma)
//context.write(key, new IndexRecordWritable(valueBuilder.substring(0, valueBuilder.length() - 1)));
context.write(key, new IndexRecordWritable(set));
//valueBuilder.setLength(0);
你的 IndexRecordWritable.java 必须有这个。
// Save each index record from maps
private HashSet<IndexMapRecordWritable> tokens = new HashSet<IndexMapRecordWritable>();
// to save the string
//private String value;
public IndexRecordWritable()
public IndexRecordWritable(
HashSet<IndexMapRecordWritable> indexMapRecordWritables)
/***/
tokens.addAll(indexMapRecordWritables);
请记住,根据减速器的描述,这不是要求。
POST-CONDITION: emit the output a single key-value where all the file names are separated by a comma ",". <"marcello", "a.txt@3345,b.txt@344,c.txt@785">
如果你仍然选择发出 (Text, IndexRecordWritable),记得在 IndexRecordWritable 中处理 HashSet 以获得所需的格式。
【讨论】:
以上是关于mapreduce 中的可写类的主要内容,如果未能解决你的问题,请参考以下文章
[MapReduce_5] MapReduce 中的 Combiner 组件应用
MapReduce 示例:减少 Hadoop MapReduce 中的侧连接