mapreduce 的二次排序

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了mapreduce 的二次排序相关的知识,希望对你有一定的参考价值。

  • 一: 理解二次排序的功能, 使用自己理解的方式表达(包括自定义数据类型,分区,分组,排序)
  • 二: 编写实现二次排序功能, 提供源码文件。
  • 三:理解mapreduce join 的几种 方式,编码实现reduce join,提供源代码,说出思路。

一: 二次排序 使用自己理解的方式表达(包括自定义数据类型,分区,分组,排序)

1.1 二次排序的功能

   1. 当客户端提交一个作业的时候,hadoop 会开启yarn 接受进行数据拷贝处理,之后交友有yarn 框架上的启动服务resourcemanager 接收,同时指派任务给nomanager ,nodemanger 会调用开 applicationmaster 处理任务,同时在 container 分配好要处理任务环境的抽象,封装了CPU、内存等多维资源以及环境变量、启动命令等任务运行相关的信息.之后输入数据,在输入数据进行数据inputspilt分割,人很掉用mapper基类将数据分割成,key-values键值对之后调用map()方法,调用该方法后会对keys-values 对分割,之后经过shuffle 过程map 的输出,就是reduce 端的输入 经过reduce段数据即可输出到hdfs 上面。 二次排序 就是首先按照第一字段排序,然后再对第一字段相同的行按照第二字段排序。
   2. 在shuffle 过程中,会对数据进行分割(spilt),分区(partitioner),排序(sort),合并(combine),压缩(compress),分组(group) 之后输出到reduce端。

1.2 shuffle 对job 格式定义:

      1) partitioner
                job.setPartitionerClass(FirstPartitioner.class);
       2) sort
                job.setSortComparatorClass(cls);

     3) combine
            job.setCombinerClass(cls);
     4) compress
         set by configuration
     5) group
    job.setGroupingComparatorClass(FirstGroupingComparator.class);

二: 编写实现二次排序功能, 提供源码文件。

2.1 二次排序格式要求

  1. 利用mapreduce 默认会对key 进行排序的方法对job 进行第一次排序
  2. 把key和需要排序的第二个字段进行组合

2.2 二次排序Java的代码

SecoundarySortMapReduce.java

package org.apache.hadoop.studyhadoop.sort;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * 
 * @author zhangyy
 *
 */
public class SecondarySortMapReduce extends Configured implements Tool{

    // step 1: mapper class
    /**
     * public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
     */
    public static class SecondarySortMapper extends //
        Mapper<LongWritable,Text,PairWritable,IntWritable>{

        private PairWritable mapOutputKey = new PairWritable() ;
        private IntWritable mapOutputValue = new IntWritable() ;

        @Override
        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            // line value
            String lineValue = value.toString();
            // split
            String[] strs = lineValue.split(",") ;
            // invalidate
            if(2 != strs.length){
                return ;
            }

            // set map  output key and value
            mapOutputKey.set(strs[0], Integer.valueOf(strs[1]));
            mapOutputValue.set(Integer.valueOf(strs[1]));

            // output
            context.write(mapOutputKey, mapOutputValue);
        }
    }

    // step 2: reducer class
    /**
     * public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
     */
    public static class SecondarySortReducer extends //
        Reducer<PairWritable,IntWritable,Text,IntWritable>{
        private Text outputKey = new Text() ;

        @Override
        public void reduce(PairWritable key, Iterable<IntWritable> values,
                Context context)
                throws IOException, InterruptedException {
            // set output key
            outputKey.set(key.getFirst());

            // iterator
            for(IntWritable value : values){
                // output
                context.write(outputKey, value);
            }
        }
    }

    // step 3: driver
    public int run(String[] args) throws Exception {
        // 1: get configuration
        Configuration configuration = super.getConf() ;

        // 2: create job
        Job job = Job.getInstance(//
            configuration, //
            this.getClass().getSimpleName()//
        );
        job.setJarByClass(this.getClass());

        // 3: set job
        // input  -> map  -> reduce -> output
        // 3.1: input
        Path inPath = new Path(args[0]) ;
        FileInputFormat.addInputPath(job, inPath);

        // 3.2: mapper
        job.setMapperClass(SecondarySortMapper.class);
        job.setMapOutputKeyClass(PairWritable.class);
        job.setMapOutputValueClass(IntWritable.class);

// ===========================Shuffle======================================     
        // 1) partitioner
                job.setPartitionerClass(FirstPartitioner.class);
        // 2) sort
//              job.setSortComparatorClass(cls);
        // 3) combine
//              job.setCombinerClass(cls);
        // 4) compress
            // set by configuration
        // 5) group
        job.setGroupingComparatorClass(FirstGroupingComparator.class);
// ===========================Shuffle======================================     

        // 3.3: reducer
        job.setReducerClass(SecondarySortReducer.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(IntWritable.class);
        // set reducer number
        job.setNumReduceTasks(2);

        // 3.4: output
        Path outPath = new Path(args[1]);
        FileOutputFormat.setOutputPath(job, outPath);

        // 4: submit job 
        boolean isSuccess = job.waitForCompletion(true);

        return isSuccess ? 0 : 1 ;
    }

    public static void main(String[] args) throws Exception {
        args = new String[]{
                "hdfs://namenode01.hadoop.com:8020/input/sort" ,//
                "hdfs://namenode01.hadoop.com:8020/output"
            };      

        // create configuration
        Configuration configuration = new Configuration();

        // run job
        int status = ToolRunner.run(//
            configuration, //
            new SecondarySortMapReduce(), //
            args
        ) ;

        // exit program
        System.exit(status);
    }
}

PairWritable.java

package org.apache.hadoop.studyhadoop.sort;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class PairWritable implements WritableComparable<PairWritable> {

    private String first;
    private int second;

    public PairWritable() {
    }

    public PairWritable(String first, int second) {
        this.set(first, second);
    }

    public void set(String first, int second) {
        this.first = first;
        this.setSecond(second);
    }

    public String getFirst() {
        return first;
    }

    public void setFirst(String first) {
        this.first = first;
    }

    public int getSecond() {
        return second - Integer.MAX_VALUE;
    }

    public void setSecond(int second) {
        this.second = second + Integer.MAX_VALUE;
    }

    public void write(DataOutput out) throws IOException {
        out.writeUTF(first);
        out.writeInt(second);
    }

    public void readFields(DataInput in) throws IOException {
        this.first = in.readUTF();
        this.second = in.readInt();
    }

    public int compareTo(PairWritable o) {
        // compare first
        int comp =this.first.compareTo(o.getFirst()) ;

        // eqauls
        if(0 != comp){
            return comp ;
        }

        // compare
        return Integer.valueOf(this.getSecond()).compareTo(Integer.valueOf(o.getSecond())) ;
    }

}
FirstPartitioner.java

package org.apache.hadoop.studyhadoop.sort;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;

public class FirstPartitioner extends Partitioner<PairWritable,IntWritable> {

    @Override
    public int getPartition(PairWritable key, IntWritable value,
            int numPartitions) {
        return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions;
    }

}

FirstGroupingComparator.java

package org.apache.hadoop.studyhadoop.sort;

import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.WritableComparator;

public class FirstGroupingComparator implements RawComparator<PairWritable> {

    // object compare
    public int compare(PairWritable o1, PairWritable o2) {
        return o1.getFirst().compareTo(o2.getFirst());
    }

    // bytes compare
    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
        return WritableComparator.compareBytes(b1, 0, l1 - 4, b2, 0, l2 - 4);
    }

}

2.3 输出测试

上传数据处理:
 hdfs dfs -put sort /input

运行输出:

技术分享图片
技术分享图片

技术分享图片

三:理解mapreduce join 的几种 方式,编码实现reduce join,提供源代码,说出思路。

3.1 mapreduce join 有三种:

   3.1.1 map 的端的join 
   map阶段不能获取所有需要的join字段,即:同一个key对应的字段可能位于不同map中。Reduce side join是非常低效的,因为shuffle阶段要进行大量的数据传输。
Map side join是针对以下场景进行的优化:两个待连接表中,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样,我们可以将小表复制多份,让每个map task内存中存在一份(比如存放到hash table中),然后只扫描大表:对于大表中的每一条记录key/value,在hash table中查找是否有相同的key的记录,如果有,则连接后输出即可。
为了支持文件的复制,Hadoop提供了一个类DistributedCache 去实现。
   3.1.2 reduce 的端的join 
   在map阶段,map函数同时读取两个文件File1和File2,为了区分两种来源的key/value数据对,对每条数据打一个标签(tag),比如:tag=0表示来自文件File1,tag=2表示来自文件File2。即:map阶段的主要任务是对不同文件中的数据打标签。
在reduce阶段,reduce函数获取key相同的来自File1和File2文件的value list, 然后对于同一个key,对File1和File2中的数据进行join(笛卡尔乘积)。即:reduce阶段进行实际的连接操作
   3.1.3 SemiJoin
   SemiJoin,也叫半连接,是从分布式数据库中借鉴过来的方法。它的产生动机是:对于reduce side join,跨机器的数据传输量非常大,这成了join操作的一个瓶颈,如果能够在map端过滤掉不会参加join操作的数据,则可以大大节省网络IO。
实现方法很简单:选取一个小表,假设是File1,将其参与join的key抽取出来,保存到文件File3中,File3文件一般很小,可以放到内存中。在map阶段,使用DistributedCache将File3复制到各个TaskTracker上,然后将File2中不在File3中的key对应的记录过滤掉,剩下的reduce阶段的工作与reduce side join相同

3.2 编程代码:

DataJoinMapReduce.java


DataJoinMapReduce.java

package org.apache.hadoop.studyhadoop.join;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * 
 * @author zhangyy
 * 
 */
public class DataJoinMapReduce extends Configured implements Tool {
    // step 1 : mapper
    /**
     * public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
     */
    public static class WordCountMapper extends //
            Mapper<LongWritable, Text, LongWritable, DataJoinWritable> {

        private LongWritable mapOutputKey = new LongWritable();
        private DataJoinWritable mapOutputValue = new DataJoinWritable();

        @Override
        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {

            // split
            String[] strs = value.toString().split(",");

            // invalidate
            if ((3 != strs.length) && (4 != strs.length)) {
                return;
            }

            // set mapoutput key
            Long cid = Long.valueOf(strs[0]);
            mapOutputKey.set(cid);

            // set name
            String name = strs[1];

            // customer
            if (3 == strs.length) {
                String phone = strs[2];
                mapOutputValue.set("customer", name + "," + phone);
            }
            // order
            if (4 == strs.length) {
                String price = strs[2];
                String date = strs[3];
                mapOutputValue.set("order", name + "," + price + "," + date);
            }

            context.write(mapOutputKey, mapOutputValue);
        }

    }

    // step 2 : reducer
    public static class WordCountReducer extends //
            Reducer<LongWritable, DataJoinWritable, NullWritable, Text> {

        private Text outputValue = new Text();

        @Override
        public void reduce(LongWritable key, Iterable<DataJoinWritable> values,
                Context context) throws IOException, InterruptedException {

            String customerInfo = new String();
            List<String> orderList = new ArrayList<String>();

            for (DataJoinWritable value : values) {
                if ("customer".equals(value.getTag())) {
                    customerInfo = value.getData();
                } else if ("order".equals(value.getTag())) {
                    orderList.add(value.getData());
                }
            }

            for (String order : orderList) {
                outputValue.set(key.toString() + "," + customerInfo + ","
                        + order);
                context.write(NullWritable.get(), outputValue);
            }

        }
    }

    // step 3 : job

    public int run(String[] args) throws Exception {

        // 1 : get configuration
        Configuration configuration = super.getConf();

        // 2 : create job
        Job job = Job.getInstance(//
                configuration,//
                this.getClass().getSimpleName());
        job.setJarByClass(DataJoinMapReduce.class);

        // job.setNumReduceTasks(tasks);

        // 3 : set job
        // input --> map --> reduce --> output
        // 3.1 : input
        Path inPath = new Path(args[0]);
        FileInputFormat.addInputPath(job, inPath);

        // 3.2 : mapper
        job.setMapperClass(WordCountMapper.class);
        // TODO
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(DataJoinWritable.class);

        // ====================shuffle==========================
        // 1: partition
        // job.setPartitionerClass(cls);
        // 2: sort
        // job.setSortComparatorClass(cls);
        // 3: combine
        // job.setCombinerClass(cls);
        // 4: compress
        // set by configuration
        // 5 : group
        // job.setGroupingComparatorClass(cls);

        // ====================shuffle==========================

        // 3.3 : reducer
        job.setReducerClass(WordCountReducer.class);
        // TODO
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Text.class);

        // 3.4 : output
        Path outPath = new Path(args[1]);
        FileOutputFormat.setOutputPath(job, outPath);

        // 4 : submit job
        boolean isSuccess = job.waitForCompletion(true);
        return isSuccess ? 0 : 1;

    }

    public static void main(String[] args) throws Exception {

        args = new String[] {
                "hdfs://namenode01.hadoop.com:8020/join",
                "hdfs://namenode01.hadoop.com:8020/output3/" 
                };

        // get configuration
        Configuration configuration = new Configuration();

        // configuration.set(name, value);

        // run job
        int status = ToolRunner.run(//
                configuration,//
                new DataJoinMapReduce(),//
                args);

        // exit program
        System.exit(status);
    }

}
DataJoinWritable.java

package org.apache.hadoop.studyhadoop.join;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

public class DataJoinWritable implements Writable {

    private String tag ;
    private String data ;

    public DataJoinWritable() {

    }

    public DataJoinWritable(String tag, String data) {
        this.set(tag, data); 
    }
    public void set(String tag, String data) {
        this.setTag(tag);
        this.setData(data);
    }
    public String getTag() {
        return tag;
    }

    public void setTag(String tag) {
        this.tag = tag;
    }

    public String getData() {
        return data;
    }

    public void setData(String data) {
        this.data = data;
    }

    @Override
    public int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result + ((data == null) ? 0 : data.hashCode());
        result = prime * result + ((tag == null) ? 0 : tag.hashCode());
        return result;
    }

    @Override
    public boolean equals(Object obj) {
        if (this == obj)
            return true;
        if (obj == null)
            return false;
        if (getClass() != obj.getClass())
            return false;
        DataJoinWritable other = (DataJoinWritable) obj;
        if (data == null) {
            if (other.data != null)
                return false;
        } else if (!data.equals(other.data))
            return false;
        if (tag == null) {
            if (other.tag != null)
                return false;
        } else if (!tag.equals(other.tag))
            return false;
        return true;
    }

    public void write(DataOutput out) throws IOException {
        out.writeUTF(this.getTag());
        out.writeUTF(this.getData());
    }

    public void readFields(DataInput in) throws IOException {
        this.setTag(in.readUTF());
        this.setData(in.readUTF());
    }

    @Override
    public String toString() {
        return tag + "," + data ;
    }

}

3.3 运行代码测试

上传文件:
hdfs dfs -put customers.txt /join 
hdfs dfs -put orders.txt /join

运行结果:

技术分享图片
技术分享图片
技术分享图片

以上是关于mapreduce 的二次排序的主要内容,如果未能解决你的问题,请参考以下文章

关于MapReduce二次排序的一点解答

重新认识mapreduce

MapReduce排序之 二次排序

Hadoop MapReduce编程 API入门系列之二次排序

Spark的二次排序

MapReduce二次排序