实验5 MapReduce初级编程实践——编写程序实现对输入文件的排序
Posted Z.Q.Fengᯤ⁵ᴳ
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了实验5 MapReduce初级编程实践——编写程序实现对输入文件的排序相关的知识,希望对你有一定的参考价值。
一、实验目的
- 通过实验掌握基本的MapReduce编程方法;
- 掌握用MapReduce解决一些常见的数据处理问题,包括数据去重、数据排序和数据挖掘等。
二、实验平台
- 操作系统:Linux(建议Ubuntu16.04或Ubuntu18.04)
- Hadoop版本:3.1.3
三、实验内容
编写程序实现对输入文件的排序
现在有多个输入文件,每个文件中的每行内容均为一个整数。要求读取所有文件中的整数,进行升序排序后,输出到一个新的文件中,输出的数据格式为每行两个整数,第一个数字为第二个整数的排序位次,第二个整数为原待排列的整数。下面是输入文件和输出文件的一个样例供参考。
输入文件 1 的样例如下:
33
37
12
40
输入文件 2 的样例如下:
4
16
39
5
输入文件 3 的样例如下:
1
45
25
根据输入文件 1、2 和 3 得到的输出文件C的样例如下:
1 1
2 4
3 5
4 12
5 16
6 25
7 33
8 37
9 39
10 40
11 45
四、实验步骤
进入 Hadoop 安装目录,启动 hadoop:
cd /usr/local/hadoop
sbin/start-dfs.sh
新建文件夹,创建文件 1、2 和 3:
sudo mkdir Pritice2 && cd Pritice2
sudo vim 1
sudo vim 2
sudo vim 3
编写 Java 文件实现 MapReduce:
sudo vim MergeSort.java
实现的 Java 代码如下:
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class MergeSort
/**
* @param args
* 输入多个文件,每个文件中的每行内容均为一个整数
* 输出到一个新的文件中,输出的数据格式为每行两个整数,第一个数字为第二个整数的排序位次,第二个整数为原待排列的整数
*/
//map函数读取输入中的value,将其转化成IntWritable类型,最后作为输出key
public static class Map extends Mapper<Object, Text, IntWritable, IntWritable>
private static IntWritable data = new IntWritable();
public void map(Object key, Text value, Context context) throws IOException,InterruptedException
String text = value.toString();
data.set(Integer.parseInt(text));
context.write(data, new IntWritable(1));
//reduce函数将map输入的key复制到输出的value上,然后根据输入的value-list中元素的个数决定key的输出次数,定义一个全局变量line_num来代表key的位次
public static class Reduce extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable>
private static IntWritable line_num = new IntWritable(1);
public void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException,InterruptedException
for(IntWritable val : values)
context.write(line_num, key);
line_num = new IntWritable(line_num.get() + 1);
//自定义Partition函数,此函数根据输入数据的最大值和MapReduce框架中Partition的数量获取将输入数据按照大小分块的边界,然后根据输入数值和边界的关系返回对应的Partiton ID
public static class Partition extends Partitioner<IntWritable, IntWritable>
public int getPartition(IntWritable key, IntWritable value, int num_Partition)
int Maxnumber = 65223;//int型的最大数值
int bound = Maxnumber/num_Partition+1;
int keynumber = key.get();
for (int i = 0; i<num_Partition; i++)
if(keynumber<bound * (i+1) && keynumber>=bound * i)
return i;
return -1;
public static void main(String[] args) throws Exception
// TODO Auto-generated method stub
Configuration conf = new Configuration();
conf.set("fs.default.name","hdfs://localhost:9000");
String[] otherArgs = new String[]"input","output"; /* 直接设置输入参数 */
if (otherArgs.length != 2)
System.err.println("Usage: wordcount <in><out>");
System.exit(2);
Job job = Job.getInstance(conf,"Merge and sort");
job.setJarByClass(MergeSort.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setPartitionerClass(Partition.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
赋予用户相关权限:
sudo chown -R hadoop /usr/local/hadoop
添加编译所需要使用的 jar 包:
vim ~/.bashrc
添加下面一行到文件的最后:
export HADOOP_HOME=/usr/local/hadoop
export CLASSPATH=$($HADOOP_HOME/bin/hadoop classpath):$CLASSPATH
使更改立即生效:
source ~/.bashrc
编译 MergeSort.java:
javac MergeSort.java
打包生成的 class 文件为 jar 包:
jar -cvf MergeSort.jar *.class
创建 Hadoop 主目录为 /user/hadoop 并创建 input 文件夹:
/usr/local/hadoop/bin/hdfs dfs -mkdir -p /user/hadoop
/usr/local/hadoop/bin/hdfs dfs -mkdir input
若 intput 已存在则删除原有文件:
/usr/local/hadoop/bin/hdfs dfs -rm input/*
上传 1、2 和 3 文件到 input 文件夹中:
/usr/local/hadoop/bin/hdfs dfs -put ./1 input
/usr/local/hadoop/bin/hdfs dfs -put ./2 input
/usr/local/hadoop/bin/hdfs dfs -put ./3 input
使用之前确保 output 文件夹不存在:
/usr/local/hadoop/bin/hdfs dfs -rm -r output
使用我们刚生成的 Merge.jar 包:
/usr/local/hadoop/bin/hadoop jar MergeSort.jar MergeSort
查看输出结果:
/usr/local/hadoop/bin/hdfs dfs -cat output/*
输出如下:
hadoop@fzqs-Laptop:/usr/local/hadoop$ hdfs dfs -cat output/*
1 1
2 4
3 5
4 12
5 16
6 25
7 33
8 37
9 39
10 40
11 45
hadoop@fzqs-Laptop:/usr/local/hadoop$
此外,有想用 Python 写的可以参考我这篇博客:实验5 MapReduce初级编程实践(Python实现)
以上是关于实验5 MapReduce初级编程实践——编写程序实现对输入文件的排序的主要内容,如果未能解决你的问题,请参考以下文章
实验5 MapReduce初级编程实践——编程实现文件合并和去重操作
实验5 MapReduce初级编程实践——编程实现文件合并和去重操作