大数据框架之Hadoop:MapReduceMapReduce框架原理——Join多种应用

Posted yiluohan0307

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据框架之Hadoop:MapReduceMapReduce框架原理——Join多种应用相关的知识,希望对你有一定的参考价值。

3.7.1Reduce Join

1、工作原理

Map端的主要工作:为来自不同表或文件的key/value对,打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。

Reduce端的主要工作:在Reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在Map阶段已经达标)分开,最后进行合并就ok了。

3.7.2Reduce Join案例实操

1、需求

将商品信息表中数据根据商品pid合并到订单数据表中。

1)输入

order.txt

1001	01	1
1002	02	2
1003	03	3
1004	01	4
1005	02	5
1006	03	6

pd.txt

01	小米
02	华为
03	格力

2)输出

1001	小米	1	
1001	小米	1	
1002	华为	2	
1002	华为	2	
1003	格力	3	
1003	格力	3

2、需求分析

通过将关联条件作为Map输出的key,将两表满足Join条件的数据并携带数据所来源的文件信息,发往同一个ReduceTask,在Reduce中进行数据的串联,如图4-20所示。

3、代码实现

1)创建商品和订合并后的Bean类

package com.cuiyf41.join;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class TableBean implements Writable 
    private String order_id; // 订单id
    private String p_id;      // 产品id
    private int amount;       // 产品数量
    private String pname;     // 产品名称
    private String flag;      // 表的标记

    public TableBean() 
        super();
    

    public TableBean(String order_id, String p_id, int amount, String pname, String flag) 
        super();
        this.order_id = order_id;
        this.p_id = p_id;
        this.amount = amount;
        this.pname = pname;
        this.flag = flag;
    

    @Override
    public void write(DataOutput out) throws IOException 
        out.writeUTF(order_id);
        out.writeUTF(p_id);
        out.writeInt(amount);
        out.writeUTF(pname);
        out.writeUTF(flag);
    

    @Override
    public void readFields(DataInput in) throws IOException 
        order_id = in.readUTF();
        p_id = in.readUTF();
        amount = in.readInt();
        pname = in.readUTF();
        flag = in.readUTF();
    

    @Override
    public String toString() 
        return order_id + "\\t" + pname + "\\t" + amount;
    

    public String getOrder_id() 
        return order_id;
    

    public void setOrder_id(String order_id) 
        this.order_id = order_id;
    

    public String getP_id() 
        return p_id;
    

    public void setP_id(String p_id) 
        this.p_id = p_id;
    

    public int getAmount() 
        return amount;
    

    public void setAmount(int amount) 
        this.amount = amount;
    

    public String getPname() 
        return pname;
    

    public void setPname(String pname) 
        this.pname = pname;
    

    public String getFlag() 
        return flag;
    

    public void setFlag(String flag) 
        this.flag = flag;
    

2)编写TableMapper类

package com.cuiyf41.join;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

public class TableMapper extends Mapper<LongWritable, Text, Text, TableBean> 

    String name;
    TableBean bean = new TableBean();
    Text k = new Text();

    @Override
    protected void setup(Mapper<LongWritable, Text, Text, TableBean>.Context context) throws IOException, InterruptedException 
        // 1 获取输入文件切片
        FileSplit split = (FileSplit) context.getInputSplit();

        // 2 获取输入文件名称
        name = split.getPath().getName();
    

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, TableBean>.Context context) throws IOException, InterruptedException 

        // 1 获取输入数据
        String line = value.toString();

        // 2切割
        String[] fields = line.split("\\t");

        // 3 不同文件分别处理
        if (name.startsWith("order")) // 订单表处理
            // 3.1 封装bean对象
            bean.setOrder_id(fields[0]);
            bean.setP_id(fields[1]);
            bean.setAmount(Integer.parseInt(fields[2]));
            bean.setPname("");
            bean.setFlag("order");

            k.set(fields[1]);
         else // 产品表处理

            // 3.2 封装bean对象
            bean.setP_id(fields[0]);
            bean.setPname(fields[1]);
            bean.setFlag("pd");
            bean.setAmount(0);
            bean.setOrder_id("");

            k.set(fields[0]);
        

        // 4 写出
        context.write(k, bean);
    

3)编写TableReducer类

package com.cuiyf41.join;

import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.ArrayList;

public class TableReducer extends Reducer<Text,TableBean, TableBean, NullWritable> 

    @Override
    protected void reduce(Text key, Iterable<TableBean> values, Reducer<Text, TableBean, TableBean, NullWritable>.Context context) throws IOException, InterruptedException 
        // 1准备存储订单的集合
        ArrayList<TableBean> orderBeans = new ArrayList<>();

        // 2 准备bean对象
        TableBean pdBean = new TableBean();

        for (TableBean bean : values) 
            if ("order".equals(bean.getFlag())) // 订单表

                // 拷贝传递过来的每条订单数据到集合中
                TableBean tmpBean = new TableBean();

                try 
                    BeanUtils.copyProperties(tmpBean, bean);
                 catch (Exception e) 
                    e.printStackTrace();
                

                orderBeans.add(tmpBean);
             else // 产品表

                try 
                    // 拷贝传递过来的产品表到内存中
                    BeanUtils.copyProperties(pdBean, bean);
                 catch (Exception e) 
                    e.printStackTrace();
                
            
        

        // 3 表的拼接
        for(TableBean bean:orderBeans) 
            bean.setPname(pdBean.getPname());

            // 4 数据写出去
            context.write(bean, NullWritable.get());
        
    

4)编写TableDriver类

package com.cuiyf41.join;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class TableDriver 
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException 
        // 0 根据自己电脑路径重新配置
        args = new String[]"e:/input/inputtable","e:/output1";

        // 1 获取配置信息,或者job对象实例
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        // 2 指定本程序的jar包所在的本地路径
        job.setJarByClass(TableDriver.class);

        // 3 指定本业务job要使用的Mapper/Reducer业务类
        job.setMapperClass(TableMapper.class);
        job.setReducerClass(TableReducer.class);

        // 4 指定Mapper输出数据的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(TableBean.class);

        // 5 指定最终输出的数据的kv类型
        job.setOutputKeyClass(TableBean.class);
        job.setOutputValueClass(NullWritable.class);

        // 6 指定job的输入原始文件所在目录
        Path input = new Path(args[0]);
        Path output = new Path(args[1]);
        // 如果输出路径存在,则进行删除
        FileSystem fs = FileSystem.get(conf);
        if (fs.exists(output)) 
            fs.delete(output,true);
        
        FileInputFormat.setInputPaths(job, input);
        FileOutputFormat.setOutputPath(job, output);

        // 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    

4、测试

运行程序查看结果

1001	小米	1	
1001	小米	1	
1002	华为	2	
1002	华为	2	
1003	格力	3	
1003	格力	3

5、总结

缺点:这种方式中,合并的操作是在Reduce阶段完成的,Reduce端的处理压力太大,Map节点的运算负载则很低,资源利用率不高,且在Reduce杰顿极易产生数据倾斜。

解决方案:Map端实现数据合并。

Mapper:// 1 获取输入数据

3.7.3Map Join

一、概述

1)使用场景

Map Join适用于一张表十分小、一张表很大的场景。

2)优点

思考:在Reduce端处理过多的表,非常容易产生数据倾斜。怎么办?

在Map端缓存多张表,提前处理业务逻辑,这样增加Map端业务,减少Reduce端数据的压力,尽可能的减少数据倾斜。

3)具体办法:采用DistributedCache

(1)在Mapper的setup阶段,将文件读取到缓存集合中。

(2)在驱动函数中加载缓存。

// 缓存普通文件到Task运行节点。
job.addCacheFile(new URI("file://e:/cache/pd.txt"))

二、Map Join案例实操

1、需求

将商品信息表中数据根据商品pid合并到订单数据表中。

1)输入数据

order.txt

1001	01	1
1002	02	2
1003	03	3
1004	01	4
1005	02	5
1006	03	6

pd.txt

01	小米
02	华为
03	格力

2)输出数据

2、需求分析

MapJoin适用于关联表中有小表的情形。

3、实现代码

(1)先在驱动模块中添加缓存文件

package com.cuiyf41.mapjoin;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

public class DistributedCacheDriver 
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, URISyntaxException 
        // 0 根据自己电脑路径重新配置
        args = new String[]"e:/input/inputtable2", "e:/output1";

        // 1 获取job信息
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        // 2 设置加载jar包路径
        job.setJarByClass(DistributedCacheDriver.class);

        // 3 关联map
        job.setMapperClass(DistributedCacheMapper.class);

        // 4 设置最终输出数据类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class1 MapReduce 框架原理
  • 2 Hadoop 数据压缩

  • 1 MapReduce 框架原理

    1.1 InputFormat 数据输入

    1.1.1 切片与 MapTask并行度决定机制

    问题引出

    MapTask的并行度决定 Map阶段的任务处理并发度,进而影响到整个 Job的处理速度。
    思考:1G的数据,启动 8个 MapTask可以提高集群的并发处理能力。那么1K的数据,也启动 8个MapTask,会提高集群性能吗? MapTask并行任务是否越多越好呢? 哪些因素影响了 MapTask并行度?

    MapTask并行度决定机制

    数据块: Block是 HDFS物理上把数据分成一块一块。 数据块是 HDFS存储数据单位 。
    数据切片: 数据切片只是在逻辑上对输入进行分片, 并不会在磁盘上将其切分成片进行存储 。 数据切片是 MapReduce程序计算输入数据的单位 ,一个切片会对应启动一个 MapTask。

    1.1.2 Job 提交流程源码和切片源码详解

    Job 提交流程源码详解












    FileInputFormat 切片源码解析(input.getSplits(job))




    1.1.3 FileInputFormat 切片机制

    切片机制

    (1)简单地按照文件的内容长度进行切片
    (2)切片大小,默认等于Block大小
    (3)切片时不考虑数据集整体,而是逐个针对每一个文件单独切片

    源码中计算切片大小的公式

    Math.max(minSize, Math.min(maxSize, blockSize));
    mapreduce.input.fileinputformat.split.minsize=1 默认值为1
    mapreduce.input.fileinputformat.split.maxsize= Long.MAXValue 默认值Long.MAXValue
    因此,默认情况下,切片大小=blocksize。

    切片大小设置

    maxsize(切片最大值):参数如果调得比blockSize小,则会让切片变小,而且就等于配置的这个参数的值。
    minsize(切片最小值):参数调的比blockSize大,则可以让切片变得比blockSize还大。

    1.1.4 TextInputFormat

    FileInputFormat 实现类

    思考:在运行MapReduce 程序时,输入的文件格式包括:基于行的日志文件、二进制格式文件、数据库表等。那么,针对不同的数据类型,MapReduce 是如何读取这些数据的呢?

    FileInputFormat 常见的接口实现类包括:TextInputFormatKeyValueTextInputFormatNLineInputFormatCombineTextInputFormat 和自定义InputFormat 等。

    TextInputFormat

    TextInputFormat 是默认的FileInputFormat 实现类。按行读取每条记录。键是存储该行在整个文件中的起始字节偏移量,LongWritable类型。值是这行的内容,不包括任何行终止符( 换行符和回车符),Text类型。
    以下是一个示例,比如,一个分片包含了如下4条文本记录。

    Rich learning form
    Intelligent learning engine
    Learning more convenient
    From the real demand for more close to the enterprise
    

    每条记录表示为以下键/值对:

    (0,Rich learning from)
    (20 ,Intelligent learning engine)
    (49 ,Learning more convenient)
    (74 ,From the real demand for more close to the enterprise)
    

    1.1.5 CombineTextInputFormat切片机制

    框架默认的 TextInputFormat切片机制是对任务按文件规划切片,不管文件多小都会是一个单独的切片,都会交给一个 MapTask 这样如果有大量小文件就会产生大量的MapTask,处理效率极其低下。

    应用场景
    CombineTextInputFormat用于小文件过多的场景, 它可以将多个小文件从逻辑上规划到一个切片中,这样多个小文件就可以交给一个 MapTask处理 。

    虚拟存储切片最大值设置
    CombineTextInputFormat.setMaxInputSplitSize(job, 4194304); // 4m
    注意:虚拟存储切片最大值设置最好根据实际的小文件大小情况来设置具体的值。

    切片机制
    生成切片过程包括虚拟存储过程和切片过程两部分 。

    (1)虚拟存储过程:
    将输入目录下所有文件大小,依次和设置的setMaxInputSplitSize 值比较,如果不大于设置的最大值,逻辑上划分一个块。如果输入文件大于设置的最大值且大于两倍,那么以最大值切割一块;当剩余数据大小超过设置的最大值且不大于最大值2 倍,此时将文件均分成2个虚拟存储块(防止出现太小切片)。
    例如setMaxInputSplitSize 值为4M,输入文件大小为8.02M,则先逻辑上分成一个4M。剩余大小为4.02M,如果按照4M 逻辑划分,就会出现0.02M 的小的虚拟存储文件,所以将剩余的4.02M 文件切分成(2.01M 和2.01M)两个文件。

    (2)切片过程:
    (a)判断虚拟存储的文件大小是否大于setMaxInputSplitSize 值,大于等于则单独形成一个切片。
    (b)如果不大于则跟下一个虚拟存储文件进行合并,共同形成一个切片。
    (c)测试举例:有4 个小文件大小分别为1.7M、5.1M、3.4M 以及6.8M 这四个小文件,则虚拟存储之后形成6 个文件块,大小分别为:1.7M,(2.55M、2.55M),3.4M 以及(3.4M、3.4M),最终会形成3 个切片,大小分别为:(1.7+2.55)M,(2.55+3.4)M,(3.4+3.4)M

    1.1.6 CombineTextInputFormat 案例实操

    需求

    将输入的大量小文件合并成一个切片统一处理。
    (1)输入数据。准备4 个小文件,a.txt,b.txt,c.txt,d.txt。
    (2)期望一个切片处理4 个文件。

    实现过程

    (1)不做任何处理,运行WordCount 案例程序,观察切片个数为4。

    (2)在WordcountDriver 中增加如下代码,运行程序,并观察运行的切片个数为3。

    // 如果不设置InputFormat,它默认用的是TextInputFormat.class
            job.setInputFormatClass(CombineTextInputFormat.class);
    
    // 虚拟存储切片最大值设置4M
            CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
    


    (3)在WordcountDriver 中增加如下代码,运行程序,并观察运行的切片个数为1。

    // 如果不设置InputFormat,它默认用的是TextInputFormat.class
            job.setInputFormatClass(CombineTextInputFormat.class);
    
    // 虚拟存储切片最大值设置20M
            CombineTextInputFormat.setMaxInputSplitSize(job, 20971520);
    

    1.2 MapReduce 工作流程



    上面的流程是整个 MapReduce最全工作流程,但是 Shuffle过程只是从第 7步开始到第 16步结束,具体 Shuffle过程详解,如下:

    (1)MapTask 收集我们的 map()方法输出的 kv对,放到内存缓冲区中;
    (2)从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件;
    (3)多个溢出文件会被合并成大的溢出文件;
    (4)在溢出过程及合并的过程中,都要调用 Partitioner进行分区和针对key 进行排序;
    (5)ReduceTask根据自己的分区号,去各个MapTask机器上取相应的结果分区数据;
    (6)ReduceTask会抓取到同一个分区的来自不同 MapTask的结果文件,ReduceTask会将这些文件再进行合并(归并排序);
    (7)合并成大文件后,Shuffle的过程也就结束了,后面进入 ReduceTask 的逻辑运算过程(从文件中取出一个一个的键值对Group,调用用户自定义的reduce()方法)。

    注意

    (1)Shuffle中的缓冲区大小会影响到 MapReduce程序的执行效率,原则上说,缓冲区越大,磁盘 io的次数越少,执行速度就越快。
    (2)缓冲区的大小可以通过参数调整。参数:mapreduce.task.io.sort.mb 默认100M。

    1.3 Shuffle 机制

    1.3.1 Shuffle 机制

    Map 方法之后,Reduce 方法之前的数据处理过程称之为Shuffle。

    1.3.2 Partition 分区

    问题引出

    要求将统计结果按照条件输出到不同文件中(分区)。比如:将统计结果按照手机归属地不同省份输出到不同文件中(分区)。

    默认Partitioner分区

    默认分区是根据key的hashCode对ReduceTasks个数取模得到的。用户没法控制哪个key存储到哪个分区。

    自定义Partitioner步骤

    (1)自定义类继承Partitioner,重写getPartition()方法

    public class CustomPartitioner extends Partitioner<Text,FlowBean>
    	@Override
    	public int getPartition(Text key, FlowBean value, int numPartitions) 
    	// 控制分区代码逻辑
    	… …
    		return partition;
    	
    
    

    (2)在Job驱动中,设置自定义Partitioner

    job.setPartitionerClass(CustomPartitioner.class);
    

    (3)自定义Partition后,要根据自定义Partitioner的逻辑设置相应数量的ReduceTask

    job.setNumReduceTasks(5);
    

    分区总结

    (1)如果ReduceTask的数量 > getPartition的结果数,则会多产生几个空的输出文件part-r-000xx;
    (2)如果1< ReduceTask的数量 < getPartition的结果数,则有一部分分区数据无处安放,会Exception;
    (3)如果ReduceTask的数量=1,则不管MapTask端输出多少个分区文件,最终结果都交给这一个ReduceTask,最终也就只会产生一个结果文件part-r-00000;
    (4)分区号必须从零开始,逐一累加。

    案例分析

    例如:假设自定义分区数为5,则
    (1)job.setNumReduceTasks(1); 会正常运行,只不过会产生一个输出文件会报错
    (2)job.setNumReduceTasks(2); 会报错
    (3)job.setNumReduceTasks(6); 大于5,程序会正常运行,会产生空文件

    1.3.3 Partition 分区案例实操

    需求

    将统计结果按照手机归属地不同省份输出到不同文件中(分区)
    (1)输入数据

    (2)期望输出数据:手机号136、137、138、139 开头都分别放到一个独立的4个文件中,其他开头的放到一个文件中。

    需求分析

    分区类代码

    package com.Tom.mapreduce.partitioner2;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Partitioner;
    
    public class ProvincePartitioner extends Partitioner<Text, FlowBean> 
        @Override
        public int getPartition(Text text, FlowBean flowBean, int numPartitions) 
            // text是手机号
    
            String phone = text.toString();
    
            String prePhone = phone.substring(0, 3);
    
            int partition;
    
            if("136".equals(prePhone))
                partition = 0;
            else if("137".equals(prePhone))
                partition = 1;
            else if("138".equals(prePhone))
                partition = 2;
            else if("139".equals(prePhone))
                partition = 3;
            else 
                partition = 4;
            
            return partition;
        
    
    
    

    编写Mapper类

    package com.Tom.mapreduce.partitioner2;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> 
    
        private Text outK = new Text();
        private FlowBean outV = new FlowBean();
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 
    
            // 1 获取一行
            // 1    13736230513 192.196.100.1   www.atguigu.com 2481    24681   200
            String line = value.toString();
    
            // 2 切割
            // 1,13736230513,192.196.100.1,www.atguigu.com,2481,24681,200
            // 2    138846544121 192.196.100.2      264 0   200
            String[] split = line.split("\\t");
    
            // 3 抓取想要的数据
            // 手机号:13736230513
            // 上行流量和下行流量:2481,24681
            String phone = split[1];
            String up = split[split.length - 3];
            String down = split[split.length - 2];
    
            // 4 封装
            outK.set(phone);
            outV.setUpFlow(Long.parseLong(up));
            outV.setDownFlow(Long.parseLong(down));
            outV.setSumFlow();
    
            // 5 写出
            context.write(outK, outV);
        
    
    

    编写Reducer类

    package com.Tom.mapreduce.partitioner2;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean> 
        private FlowBean outV = new FlowBean();
    
        @Override
        protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException 
    
            // 1 遍历集合累加值
            long totalUp = 0;
            long totalDown = 0;
    
            for (FlowBean value : values) 
                totalUp += value.getUpFlow();
                totalDown += value.getDownFlow();
            
    
            // 2 封装outK, outV
            outV.setUpFlow(totalUp);
            outV.setDownFlow(totalDown);
            outV.setSumFlow();
    
            // 3 写出
            context.write(key, outV);
        
    
    
    

    编写流量统计的Bean 对象

    package com.Tom.mapreduce.partitioner2;
    
    import org.apache.hadoop.io.Writable;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    /**
     * 1. 定义类实现writable接口
     * 2. 重写序列化和反序列化方法
     * 3. 重写空参构造
     * 4. toString方法
     */
    public class FlowBean implements Writable
    
        private long upFlow;    // 上行流量
        private long downFlow;  // 下行流量
        private long sumFlow;   // 总流量
    
        // 空参构造
        public FlowBean() 
        
    
        public long getUpFlow() 
            return upFlow;
        
    
        public void setUpFlow(long upFlow) 
            this.upFlow = upFlow;
        
    
        public long getDownFlow() 
            return downFlow;
        
    
        public void setDownFlow(long downFlow) 
            this.downFlow = downFlow;
        
    
        public long getSumFlow() 
            return sumFlow;
        
    
        public void setSumFlow(long sumFlow) 
            this.sumFlow = sumFlow;
        
    
        public void setSumFlow() 
            this.sumFlow = this.upFlow + this.downFlow;
        
    
        @Override
        public void write(DataOutput out) throws IOException 
            out.writeLong(upFlow);
            out.writeLong(downFlow);
            out.writeLong(sumFlow);
        
    
        @Override
        public void readFields(DataInput in) throws IOException 
            this.upFlow = in.readLong();
            this.downFlow = in.readLong();
            this.sumFlow = in.readLong();
        
    
        @Override
        public String toString() 
            return upFlow + "\\t" + downFlow + "\\t" + sumFlow;
        
    
    
    

    编写Driver驱动类

    package com.huxili.mapreduce.partitioner2;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    public class FlowDirver 
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException 
    
            // 1 获取job
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
    
            // 2 设置jar
            job.setJarByClass(FlowDirver.class);
    
            // 3 关联Mapper和Reducer
            job.setMapperClass(FlowMapper.class);
            job.setReducerClass(FlowReducer.class);
    
            // 4 设置Mapper输出的key和value类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(FlowBean.class);
    
            // 5 设置最终数据输出的key和value类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(FlowBean.class);
    
            job.setPartitionerClass(ProvincePartitioner.class);
            job.setNumReduceTasks(6);
    
            // 6 设置数据的输入路径和输出路径
            FileInputFormat.setInputPaths(job, new Path("E:\\\\input\\\\inputflow"));
            FileOutputFormat.setOutputPath(job,new Path("E:\\\\hadoop\\\\output9"));
    
            // 7 提交job
            job.waitForCompletion(true);
        
    
    
    

    将ReduceTasks设置为6,可以看到前5个文件包含对应的结果,第6个文件为空。

    1.3.4 WritableComparable 排序

    排序概述

    排序是MapReduce框架中最重要的操作之一。MapTask 和ReduceTask 均会对数据按照key进行排序。该操作属于Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要。
    默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。
    对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序。
    对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则溢写磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。

    排序分类

    (1)部分排序
    MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序。
    (2)全排序
    最终输出结果只有一个文件,且文件内部有序。实现方式是只设置一个ReduceTask。但该方法在处理大型文件时效率极低,因为一台机器处理所有文件,完全丧失了MapReduce所提供的并行架构。
    (3)辅助排序:(GroupingComparator分组)
    在Reduce端对key进行分组。应用于:在接收的key为bean对象时,想让一个或几个字段相同(全部字段比较不相同)的key进入到同一个reduce方法时,可以采用分组排序。
    (4)二次排序
    在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序。

    自定义排序WritableComparable 原理分析

    bean 对象做为key 传输,需要实现WritableComparable 接口重写compareTo方法,就可以实现排序。

    @Override
    public int compareTo(FlowBean bean) 
    	int result;
    	// 按照总流量大小,倒序排列
    	if (this.sumFlow > bean.getSumFlow()) 
    		result = -1;
    	else if (this.sumFlow < bean.getSumFlow()) 
    		result = 1;
    	else 
    		result = 0;
    	
    	return result;
    
    

    1.3.5 WritableComparable 排序案例实操(二次排序)

    需求分析

    编写FlowBean类

    package com.Tom

    以上是关于大数据框架之Hadoop:MapReduceMapReduce框架原理——Join多种应用的主要内容,如果未能解决你的问题,请参考以下文章

    坐实大数据资源调度框架之王,Yarn为何这么牛

    大数据技术之Hadoop(MapReduce)框架原理数据压缩

    大数据技术之Hadoop(MapReduce)框架原理数据压缩

    大数据框架之Hadoop:MapReduceMapReduce框架原理——OutputFormat数据输出

    大数据之二:Hadoop与Spark辨析

    大数据框架之Hadoop:MapReduceMapReduce框架原理——数据清洗(ETL)

    (c)2006-2024 SYSTEM All Rights Reserved IT常识