大数据学习之九——Combiner,Partitioner,shuffle和MapReduce排序分组

Posted M_study

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据学习之九——Combiner,Partitioner,shuffle和MapReduce排序分组相关的知识,希望对你有一定的参考价值。

1.Combiner

Combiner是MapReduce的一种优化手段。每一个map都可能会产生大量的本地输出,Combiner的作用就是对map端的输出先做一次合并,以减少map和reduce结点之间的数据传输量,以提高网络IO性能。只有操作满足结合律的才可设置combiner。

Combiner的作用:

(1)Combiner实现本地key的聚合,对map输出的key排序value进行迭代:如图所示:

map: (K1, V1) → list(K2, V2) combine: (K2, list(V2)) → list(K2, V2)  reduce: (K2, list(V2)) → list(K3, V3)

 (2)Combiner还有本地reduce功能(其本质上就是一个reduce)
         例如wordcount的例子和找出value的最大值的程序 ,combiner和reduce完全一致,如下所示:

 map: (K1, V1) → list(K2, V2) combine: (K2, list(V2)) → list(K3, V3) reduce: (K3, list(V3)) → list(K4, V4)

使用combiner之后,先完成的map会在本地聚合,提升速度。对于hadoop自带的wordcount的例子,value就是一个叠加的数字,所以map一结束就可以进行reduce的value叠加,而不必要等到所有的map结束再去进行reduce的value叠加。

在实际的Hadoop集群操作中,我们是由多台主机一起进行MapReduce的,如果加入规约操作,每一台主机会在reduce之前进行一次对本机数据的规约,然后在通过集群进行reduce操作,这样就会大大节省reduce的时间,从而加快MapReduce的处理速度。

2.Partitioner

step1.3就是分区操作,哪个key到哪个reducer的分配过程,是由Partitioner规定的。

用户在中间key上使用分区函数来对数据进行分区,之后在输入到后续任务执行进程。一个默认的分区函数式使用hash方法(比如常见的:hash(key) mod R)进行分区。hash方法能够产生非常平衡的分区。

自定制Partitioner函数:

package mapreduce01;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Partitioner;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class fenqu {      

static String INPUT_PATH="hdfs://master:9000/test";  

static String OUTPUT_PATH="hdfs://master:9000/output/fenqu";    

static class MyMapper extends Mapper<Object,Object,IntWritable,NullWritable>{  

 IntWritable output_key=new IntWritable();   

NullWritable output_value=NullWritable.get();   

protected void map(Object key, Object value, Context context) throw IOException,InterruptedException{        

int val=Integer.parseUnsignedInt(value.toString().trim());    

output_key.set(val);    

context.write(output_key,output_value);   

}  

}    

static class LiuPartitioner extends Partitioner<IntWritable,NullWritable> {   

@Override   

public int getPartition(IntWritable key, NullWritable value, int numPartitions){    

int num=key.get();    

if(num>100)     return 0;    

else     return 1;                           

}   

 }  

 static class MyReduce extends Reducer<IntWritable,NullWritable,IntWritable,IntWritable>{   

IntWritable output_key=new IntWritable();     

int num=1;     

protected void reduce(IntWritable key,Iterable<NullWritable> values,Context context) throws IOException,InterruptedException{        

output_key.set(num++);    

context.write(output_key,key);    

}   }    

public static void main(String[] args) throws Exception{   

Path outputpath=new Path(OUTPUT_PATH);   

Configuration conf=new Configuration();     |

FileInputFormat.setInputPaths(job, INPUT_PATH);   

FileOutputFormat.setOutputPath(job,outputpath);      

job.setMapperClass(MyMapper.class);  

 job.setReducerClass(MyReduce.class);     

 job.setNumReduceTasks(2);   

job.setPartitionerClass(LiuPartitioner.class);     

 job.setMapOutputKeyClass(IntWritable.class);  

 job.setMapOutputValueClass(NullWritable.class);     

 job.setOutputKeyClass(IntWritable.class);  

 job.setOutputValueClass(IntWritable.class);     

 job.waitForCompletion(true);  

}

}

分区Partitioner主要作用在于以下两点:
 根据业务需要,产生多个输出文件;多个reduce任务并发运行,提高整体job的运行效率。

3.Shuffle过程

reduce阶段的三个步骤:

 step2.1就是一个shuffle【随机、洗牌】操作

shuffle是什么:针对多个map任务的输出按照不同的分区(Partition)通过网络复制到不同的reduce任务节点上,这个过程就称作为Shuffle。

在map端:

1.在map端首先是InputSplit,在InputSplit中含有DataNode中的数据,每一个InputSplit都会分配一个Mapper任务,Mapper任务结束后产生<K2,V2>的输出,这些输出先存放在缓存中,每个map有一个环形内存缓冲区,用于存储任务的输出。默认大小100MB(io.sort.mb属性),一旦达到阀值0.8(io.sort.spil l.percent),一个后台线程就把内容写到(spill)Linux本地磁盘中的指定目录(mapred.local.dir)下的新建的一个溢出写文件。

2.写磁盘前,要进行partition、sort和combine等操作。通过分区,将不同类型的数据分开处理,之后对不同分区的数据进行排序,如果有Combiner,还要对排序后的数据进行combine。等最后记录写完,将全部溢出文件合并为一个分区且排序的文件。

3.最后将磁盘中的数据送到Reduce中,图中Map输出有三个分区,有一个分区数据被送到图示的Reduce任务中,剩下的两个分区被送到其他Reducer任务中。而图示的Reducer任务的其他的三个输入则来自其他节点的Map输出。

reduce端:

1. Copy阶段:Reducer通过Http方式得到输出文件的分区。
reduce端可能从n个map的结果中获取数据,而这些map的执行速度不尽相同,当其中一个map运行结束时,reduce就会从JobTracker中获取该信息。map运行结束后TaskTracker会得到消息,进而将消息汇报给JobTracker,reduce定时从JobTracker获取该信息,reduce端默认有5个数据复制线程从map端复制数据。

2.Merge阶段:如果形成多个磁盘文件会进行合并
从map端复制来的数据首先写到reduce端的缓存中,同样缓存占用到达一定阈值后会将数据写到磁盘中,同样会进行partition、combine、排序等过程。如果形成了多个磁盘文件还会进行合并,最后一次合并的结果作为reduce的输入而不是写入到磁盘中。

3.Reducer的参数:最后将合并后的结果作为输入传入Reduce任务中。

4.排序sort

step4.1第四步中需要对不同分区中数据进行排序和分组,默认情况按照key进行排序和分组。

自定义类型MyGrouptestt实现了WritableComparable的接口,该接口中有一个compareTo()方法,当对key进行比较时会调用该方法,而我们将其改为了我们自己定义的比较规则,从而实现我们想要的效果。

自定义排序:

GroupSort.java

package mapreduce01;

import java.io.IOException;

import mapreduce01.fenqu.LiuPartitioner;

import mapreduce01.fenqu.MyMapper;

import mapreduce01.fenqu.MyReduce;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class GroupSort {     

static String INPUT_PATH="hdfs://master:9000/input/f.txt";  

static String OUTPUT_PATH="hdfs://master:9000/output/groupsort";  

 static class MyMapper extends Mapper<Object,Object,MyGrouptest,NullWritable>{  

 MyGrouptest output_key=new MyGrouptest();   

NullWritable output_value=NullWritable.get();   

protected void map(Object key, Object value, Context context) throws IOException, InterruptedException{    

String[] tokens=value.toString().split(",",2);   

 MyGrouptest output_key=new MyGrouptest(Long.parseLong(tokens[0]), Long.parseLong(tokens[1]));  

context.write(output_key,output_value);   

}  

}  

 static class MyReduce extends Reducer<MyGrouptest,NullWritable,LongWritable,LongWritable>{   

LongWritable output_key=new LongWritable();  

 LongWritable output_value=new LongWritable();    

protected void reduce(MyGrouptest key,Iterable<NullWritable> values,Context context) throws IOException,InterruptedException{    

output_key.set(key.getFirstNum());    

output_value.set(key.getSecondNum());    

context.write(output_key,output_value);    

}   

}    

public static void main(String[] args) throws Exception{   

Path outputpath=new Path(OUTPUT_PATH);   

Configuration conf=new Configuration();      

Job job=Job.getInstance(conf);   

FileInputFormat.setInputPaths(job, INPUT_PATH);  

 FileOutputFormat.setOutputPath(job,outputpath);     

 job.setMapperClass(MyMapper.class);  

 job.setReducerClass(MyReduce.class);     

 job.setNumReduceTasks(1);  

 job.setPartitionerClass(LiuPartitioner.class);      

job.setMapOutputKeyClass(MyGrouptest.class);   

job.setMapOutputValueClass(NullWritable.class);     

 job.setOutputKeyClass(LongWritable.class);   

job.setOutputValueClass(LongWritable.class);     

 job.waitForCompletion(true);  

}

}

MyGrouptest.java

package mapreduce01;

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class MyGrouptest implements WritableComparable<MyGrouptest> {       

 long firstNum;         

 long secondNum;        

public MyGrouptest() {}       

public MyGrouptest(long first, long second) {             

 firstNum = first;              

secondNum = second;       

 }        

@Override         

public void write(DataOutput out) throws IOException {              

out.writeLong(firstNum);              

out.writeLong(secondNum);         

}        

@Override       

 public void readFields(DataInput in) throws IOException {              

firstNum = in.readLong();              

secondNum = in.readLong();       

 }    /*         * 当key进行排序时会调用以下这个compreTo方法         */       

 @Override         

public int compareTo(MyGrouptest anotherKey) {              

 long min = firstNum - anotherKey.firstNum;              

 if (min != 0) {                // 说明第一列不相等,则返回两数之间小的数                   

 return (int) min;               

}    

else {                   

 return (int) (secondNum - anotherKey.secondNum);              

 }         

}     

public long getFirstNum() {   return firstNum;  }  

public long getSecondNum() {   return secondNum;  }

}

 

以上是关于大数据学习之九——Combiner,Partitioner,shuffle和MapReduce排序分组的主要内容,如果未能解决你的问题,请参考以下文章

python学习之第九课时--基本数据类型(list)

Android学习之基础知识九 — 数据存储(持久化技术)之SQLite数据库存储

Android学习之基础知识九 — 数据存储(持久化技术)之使用LitePal操作数据库

PGL图学习之基于UniMP算法的论文引用网络节点分类任务[系列九]

Python 学习之《Learn Python3 The Hard Way 》第九部分学习笔记

PGL图学习之基于GNN模型新冠疫苗任务[系列九]