使用 ChainMapper 的 TotalOrderPartion

Posted

技术标签:

【中文标题】使用 ChainMapper 的 TotalOrderPartion【英文标题】:TotalOrderPartion with ChainMapper 【发布时间】:2016-03-11 07:14:06 【问题描述】:

我有一个带有 2 个映射器的 ChainMapper。我正在尝试在链中的最后一个映射器上执行 TotalOrderPartition,但没有取得很大成功。

有没有办法根据链中第 N 个映射器上的一些采样来强制执行分区?

public class WordCountChain extends Configured implements Tool

    @Override
    public int run(String[] args) throws Exception 
    
        Job job = new Job(getConf(), "Word Count V1 (Chain)");
        job.setJarByClass(getClass());

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        /*********** First Mapper ***********/
        Configuration wcpMapperConf = new Configuration(false);
        ChainMapper.addMapper(job, WordCountPreparationMapper.class, LongWritable.class, Text.class, Text.class, IntWritable.class, wcpMapperConf);

        /*********** Second Mapper ***********/
        Configuration wcMapperConf = new Configuration(false);
        ChainMapper.addMapper(job, Mapper.class, Text.class, IntWritable.class, Text.class, IntWritable.class, wcMapperConf);

        /******* This enforces the Sampling/Partitioning over the First Mapper *******/
        //job.setInputFormatClass(SequenceFileInputFormat.class);
        //InputSampler.Sampler<Text, IntWritable> sampler = new InputSampler.RandomSampler<Text, IntWritable>(0.1, 10000, 10);
        //InputSampler.writePartitionFile(job, sampler);
        //job.addCacheFile( new URI( TotalOrderPartitioner.getPartitionFile(getConf()) ) );

        job.setNumReduceTasks(10);
        job.setReducerClass(WordCountReducer.class);
        return (job.waitForCompletion(true) ? 0 : 1);
     

     public static void main(String[] args) throws Exception 
     
        int exitCode = ToolRunner.run(new WordCountChain(), args);
        System.exit(exitCode);
     

【问题讨论】:

我是否以正确的方式看待它,还是应该以不同的方式看待它?? 【参考方案1】:

不幸的是,RandomSampler 在作业开始之前就运行了,实际上它在您调用时运行

InputSampler.writePartitionFile(job, sampler);

这意味着它不会在任何 Mapper 的输出上运行,而是在作业的输入数据集上运行。

如果您需要根据第 N 个 Mapper 的输出进行分区,您可以将您的作业拆分为两个作业,一个仅映射作业和一个 mapreduce 作业。第一个将映射器链运行到第 N 个映射器,然后只存储它的输出。第二个作业将根据输入(将是第 N 个 Mapper 的输出)进行采样和分区,然后运行其余的 Mapper 和您的 Reducer。

【讨论】:

以上是关于使用 ChainMapper 的 TotalOrderPartion的主要内容,如果未能解决你的问题,请参考以下文章

Hadoop的ChainMapper和ChainReducer使用案例(链式处理)

MapReduce ChainMapper/ChainReducer

秒表的使用方法和技巧(秒表的使用方法)

使用“使用严格”作为“使用强”的备份

在使用加载数据流步骤的猪中,使用(使用 PigStorage)和不使用它有啥区别?

Sqlmap的使用