Map-Reduce作业无法提供预期的分区文件
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Map-Reduce作业无法提供预期的分区文件相关的知识,希望对你有一定的参考价值。
在Map-Reduce作业中,我使用了五个不同的文件,其中我的数据集中包含两个类别P
和I
下的值。在找到I
特定值后,我将它们传递给I-part-r-00000文件,因此,对于P.我在reducer中使用MultipleOutputformat类来实现这一点。
我的Mapper类包含:
public class parserMapper extends Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String IPFLAG = "";
String[] element_data= value.toString.split(",");
if(element_data[0].toString().trim().equalsIgnoreCase("005010X222A1")){
IPFLAG = "P";
}
else {
IPFLAG = "I";
}
if (IPFLAG == "P") {
context.write(new Text(IPFLAG), new Text(theData));
}
else if (IPFLAG == "I") {
context.write(new Text(IPFLAG), new Text(theData));
}
else{
System.out.println("No category found");
}
}
public void run(Context context) throws IOException, InterruptedException {
setup(context);
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
cleanup(context);
}
}
减速机类包括:
public class parserReducer extends Reducer<Text, Text, Text, Text> {
private MultipleOutputs multipleOutputs;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
multipleOutputs = new MultipleOutputs(context);
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
multipleOutputs.close();
}
@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
Object c = null;
try{
if (!(key.toString().isEmpty())) {
for (Text value : values) {
multipleOutputs.write(c, value, key.toString());
}
}
}
catch(Exception e){ System.out.println("Caught Exception: " + e.getMessage());}
}
}
和驱动程序代码包括=>
public class parserDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("textinputformat.record.delimiter", "~"+"
"+"ISA*");
Job job = new Job(conf);
job.setJobName("PARSER");
job.setJarByClass(parserDriver.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(parserMapper.class);
job.setReducerClass(parserReducer.class);
// job.setOutputFormatClass(TextOutputFormat.class);
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
// job.setOutputFormatClass(LazyOutputFormat.class);
/* MultipleOutputs.addNamedOutput(job, "P", TextOutputFormat.class, Text.class, Text.class);
MultipleOutputs.addNamedOutput(job, "I", TextOutputFormat.class, Text.class, Text.class);
*/
// Pass as option -D mapred.reduce.tasks=<number>
job.setNumReduceTasks(3);
/* This line is to accept the input recursively */
//FileInputFormat.setInputDirRecursive(job, true);
FileInputFormat.addInputPath(job, "/Users/Mohit/input");
FileOutputFormat.setOutputPath(job, "/Users/Mohit/output");
/*
* Delete output file path if already exists
*/
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outputFilePath)) {
fs.delete(outputFilePath, true);
}
return job.waitForCompletion(true) ? 0: 1;
}
}
通过这一切,我试图针对单个文件实现两个分区
file1 - > P-part-r00000,I-part-r00001
file2 - > P-part-r00002,I-part-r00003
。但我得到两个分区,反对作为此作业输入的所有文件。
file1,file2,file3,file4,file5 - > P-part-r00000,I-part-r00001
不知道我在这里缺少什么,如果有人可以帮忙吗?
答案
1)在您的驱动程序中将这些行添加到文件命名:
job.setOutputFormatClass(TextOutputFormat.class);
MultipleOutputs.addNamedOutput(job, "I", TextOutputFormat.class,
Text.class, Text.class);
MultipleOutputs.addNamedOutput(job, "P", TextOutputFormat.class,
Text.class, Text.class);
2)更改reducer以将每个值发送到具有特定名称的文件:
@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
try{
if (!(key.toString().isEmpty())) {
for (Text value : values) {
multipleOutputs.write(key.toString(), key, value);
}
}
}
catch(Exception e){ System.out.println("Caught Exception: " + e.getMessage());}
}
3)将reducer的数量更改为2以获得正好2个文件。
以上是关于Map-Reduce作业无法提供预期的分区文件的主要内容,如果未能解决你的问题,请参考以下文章