如何在hadoop中组合两个独立映射器的结果?

Posted

技术标签:

【中文标题】如何在hadoop中组合两个独立映射器的结果?【英文标题】:How to combine the results of two separate mappers in hadoop? 【发布时间】:2021-08-19 17:30:19 【问题描述】:

我想合并两个独立的mapper的结果,然后对合并的结果执行reducer。

我有两个文件。第一个文件具有以下列:A、B、C。第二个文件:A、D。 现在,两个映射器具有相同的签名:Mapper<LongWritable, Text, LongWritable, Text>。如果满足特定条件,第一个映射器的输出是 KEY: new LongWritable(A)VALUE: new Text(B, C)。如果满足另一个条件,则输出第二个 KEY: new LongWritable(A)VALUE: new Text(D)

现在,当我在减速器中从Iterable<Text> 输出值时,我得到要么 B+C D。 鉴于两个集合有交集,我如何在reducer中获得给定A的B,C,D?

【问题讨论】:

到目前为止,您在工作链方面有哪些尝试?您是否尝试过使用一个 Map 函数而不是两个函数来检查输入的列数并在内部搜索条件?如果您可以提供一些输入数据样本以及所需输出的样本,那将会容易得多。 【参考方案1】:

我有类似的用例,我通过向一个映射器添加令牌来解决问题,以便了解此记录来自减速器中的哪个文件(fileA 或 fileB),然后将它们分开。

假设fileA 是这样的:

A B C
C D D
A D D
A X Y

fileB 就像:

A ALICE
C BOB
A ALICE
A BOB

我这样写映射器(看我在 reducer 中使用这个美元符号的每个值的开头添加了一个美元符号):

public static class FileAMapper extends Mapper<LongWritable, Text, Text, Text> 

        private String specifier = "$";
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 
                String line = value.toString();
                String[] parts = line.split(" ");
                String k = parts[0];
                String v = specifier + parts[1] + " " + parts[2];
                context.write(new Text(k), new Text(v));
            
        

下一个映射器:

public static class FileBMapper extends Mapper<LongWritable, Text, Text, Text> 


    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 
        String line = value.toString();
        String[] parts = line.split(" ");
        String k = parts[0];
        String v = parts[1];
        context.write(new Text(k), new Text(v));
    

现在是 reducer:我定义了两个数组列表,以便根据我在 mapper 中使用的美元符号来分隔每个值

protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException 
            List<String> left = new ArrayList<>();
            List<String> right = new ArrayList<>();
            values.forEach((e) -> 
                String temp = e.toString();
                if (temp.startsWith("$")) 
                    left.add(temp.substring(1));
                 else 
                    right.add(temp);
                
            );


            left.forEach(l -> 
                    right.forEach(r -> 
                            System.out.println(String.format("%s %s %s", key.toString(), l, r))));
        
    

结果:

A B C ALICE
A B C ALICE
A B C BOB
A D D ALICE
A D D ALICE
A D D BOB
A X Y ALICE
A X Y ALICE
A X Y BOB
C D D BOB

司机:

Job job = new Job(new Configuration());
job.setJarByClass(Main.class);

Path fileA = new Path("input/fileA");
Path fileB = new Path("input/fileB");
Path outputPath =   new Path("output");

MultipleInputs.addInputPath(job, fileA, TextInputFormat.class,  FileAMapper.class);
MultipleInputs.addInputPath(job, fileB, TextInputFormat.class,  FileBMapper.class);
FileOutputFormat.setOutputPath(job, outputPath);
job.setMapOutputKeyClass(Text.class);
job.setReducerClass(JoinReducer.class);
job.waitForCompletion(true);

【讨论】:

以上是关于如何在hadoop中组合两个独立映射器的结果?的主要内容,如果未能解决你的问题,请参考以下文章

如何在MR作业中配置映射以批量执行?

Hadoop:在迭代映射作业之间维护内存缓存

具有单个映射器和两个不同减速器的 hadoop 作业

如果映射器在中途失败并且 Hadoop 重试该映射器,自定义计数器会发生啥

MyBatis映射器总结

当我的类中有属性具有默认构造函数来提供值自动映射器时,如何映射?