如何在hadoop中组合两个独立映射器的结果?
Posted
技术标签:
【中文标题】如何在hadoop中组合两个独立映射器的结果?【英文标题】:How to combine the results of two separate mappers in hadoop? 【发布时间】:2021-08-19 17:30:19 【问题描述】:我想合并两个独立的mapper的结果,然后对合并的结果执行reducer。
我有两个文件。第一个文件具有以下列:A、B、C。第二个文件:A、D。
现在,两个映射器具有相同的签名:Mapper<LongWritable, Text, LongWritable, Text>
。如果满足特定条件,第一个映射器的输出是 KEY: new LongWritable(A)
和 VALUE: new Text(B, C)
。如果满足另一个条件,则输出第二个 KEY: new LongWritable(A)
和 VALUE: new Text(D)
。
现在,当我在减速器中从Iterable<Text>
输出值时,我得到要么 B+C 或 D。
鉴于两个集合有交集,我如何在reducer中获得给定A的B,C,D?
【问题讨论】:
到目前为止,您在工作链方面有哪些尝试?您是否尝试过使用一个Map
函数而不是两个函数来检查输入的列数并在内部搜索条件?如果您可以提供一些输入数据样本以及所需输出的样本,那将会容易得多。
【参考方案1】:
我有类似的用例,我通过向一个映射器添加令牌来解决问题,以便了解此记录来自减速器中的哪个文件(fileA 或 fileB),然后将它们分开。
假设fileA
是这样的:
A B C
C D D
A D D
A X Y
而fileB
就像:
A ALICE
C BOB
A ALICE
A BOB
我这样写映射器(看我在 reducer 中使用这个美元符号的每个值的开头添加了一个美元符号):
public static class FileAMapper extends Mapper<LongWritable, Text, Text, Text>
private String specifier = "$";
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
String line = value.toString();
String[] parts = line.split(" ");
String k = parts[0];
String v = specifier + parts[1] + " " + parts[2];
context.write(new Text(k), new Text(v));
下一个映射器:
public static class FileBMapper extends Mapper<LongWritable, Text, Text, Text>
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
String line = value.toString();
String[] parts = line.split(" ");
String k = parts[0];
String v = parts[1];
context.write(new Text(k), new Text(v));
现在是 reducer:我定义了两个数组列表,以便根据我在 mapper 中使用的美元符号来分隔每个值
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException
List<String> left = new ArrayList<>();
List<String> right = new ArrayList<>();
values.forEach((e) ->
String temp = e.toString();
if (temp.startsWith("$"))
left.add(temp.substring(1));
else
right.add(temp);
);
left.forEach(l ->
right.forEach(r ->
System.out.println(String.format("%s %s %s", key.toString(), l, r))));
结果:
A B C ALICE
A B C ALICE
A B C BOB
A D D ALICE
A D D ALICE
A D D BOB
A X Y ALICE
A X Y ALICE
A X Y BOB
C D D BOB
司机:
Job job = new Job(new Configuration());
job.setJarByClass(Main.class);
Path fileA = new Path("input/fileA");
Path fileB = new Path("input/fileB");
Path outputPath = new Path("output");
MultipleInputs.addInputPath(job, fileA, TextInputFormat.class, FileAMapper.class);
MultipleInputs.addInputPath(job, fileB, TextInputFormat.class, FileBMapper.class);
FileOutputFormat.setOutputPath(job, outputPath);
job.setMapOutputKeyClass(Text.class);
job.setReducerClass(JoinReducer.class);
job.waitForCompletion(true);
【讨论】:
以上是关于如何在hadoop中组合两个独立映射器的结果?的主要内容,如果未能解决你的问题,请参考以下文章