hadoop之MapReduce的案例(排序最大值)
Posted 月疯
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了hadoop之MapReduce的案例(排序最大值)相关的知识,希望对你有一定的参考价值。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.xuan</groupId>
<artifactId>hadoopdemo</artifactId>
<version>1.0-SNAPSHOT</version>
<name>hadoopdemo</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.5.2</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
案列一:排序
package squencefile;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import java.io.IOException;
public class DisticAndSort
/**
* 将每行读取进来,转换成输出格式<行数据,"">
*/
public static class MyMapper extends Mapper<LongWritable,Text,Text,Text>
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
context.write(value,new Text(""));
/**
* 将行数据进行去重,输出格式<行数据,"">
*/
public static class MyReduce extends Reducer<Text,Text,Text,Text>
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException
//直接输出
context.write(key,new Text(""));
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException
//创建一个job,也就是一个运行环境
Configuration conf=new Configuration();
//集群运行
// conf.set("fs.defaultFS","hdfs://hadoop:8088");
//本地运行
Job job=Job.getInstance(conf,"DisticAndSort");
//程序入口(打jar包)
job.setJarByClass(DisticAndSort.class);
//需要输入三个文件:输入文件
FileInputFormat.addInputPath(job,new Path("F:\\\\filnk_package\\\\hadoop-2.10.1\\\\data\\\\test2\\\\file1.txt"));
FileInputFormat.addInputPath(job,new Path("F:\\\\filnk_package\\\\hadoop-2.10.1\\\\data\\\\test2\\\\file2.txt"));
FileInputFormat.addInputPath(job,new Path("F:\\\\filnk_package\\\\hadoop-2.10.1\\\\data\\\\test2\\\\file3.txt"));
//编写mapper处理逻辑
job.setMapperClass(DisticAndSort.MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//shuffle流程
//编写reduce处理逻辑
job.setReducerClass(DisticAndSort.MyReduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//输出文件
FileOutputFormat.setOutputPath(job,new Path("F:\\\\filnk_package\\\\hadoop-2.10.1\\\\data\\\\test2\\\\out"));
//运行job,需要放到Yarn上运行
boolean result =job.waitForCompletion(true);
System.out.print(result?1:0);
file1.txt
2012-3-1 a
2012-3-2 b
2012-3-3 c
2012-3-4 d
2012-3-5 a
2012-3-6 b
2012-3-7 c
2012-3-3 c
案列二:求最高气温
temp1.txt
1990-01-01 -5
1990-06-18 35
1990-03-20 8
1989-05-11 23
1989-07-05 38
1990-07-01 36
package squencefile;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class MaxTemp
/**
* map处理逻辑
* 将输入的value进行拆分,拆分出年份,然后输出<年份,日期:温度>
*/
public static class MyMapper extends Mapper<LongWritable,Text,Text,Text>
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
//将输入value进行拆分
String line=value.toString();
String[] lineArr=line.split(" ");
//输出年份
String year=lineArr[0].substring(0,4);
//输出格式:<year,day:temp>
context.write(new Text(year),new Text(lineArr[0]+":"+lineArr[1]));
/**
*
*/
public static class Myreducer extends Reducer<Text,Text,Text,DoubleWritable>
private double maxTemp = Long.MIN_VALUE;
private String maxDay = null;
//获取每年气温最大值
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException
for(Text tempVal:values)
//生成数组[日期,温度]
String tempStr = tempVal.toString();
String[] tempArr=tempStr.split(":");
Long temp = Long.parseLong(tempArr[1]);
//比较,获取最大值
maxTemp = temp >maxTemp?temp:maxTemp;
//获取天数
maxDay = tempArr[0];
context.write(new Text(maxDay), new DoubleWritable(maxTemp));
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException
//创建一个job,也就是一个运行环境
Configuration conf=new Configuration();
//集群运行
// conf.set("fs.defaultFS","hdfs://hadoop:8088");
//本地运行
Job job=Job.getInstance(conf,"MaxTemp");
//程序入口(打jar包)
job.setJarByClass(MaxTemp.class);
//需要输入俩个文件:输入文件
FileInputFormat.addInputPath(job,new Path("F:\\\\filnk_package\\\\hadoop-2.10.1\\\\data\\\\test3\\\\temp1.txt"));
FileInputFormat.addInputPath(job,new Path("F:\\\\filnk_package\\\\hadoop-2.10.1\\\\data\\\\test3\\\\temp2.txt"));
//编写mapper处理逻辑
job.setMapperClass(MaxTemp.MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//shuffle流程
//编写reduce处理逻辑
job.setReducerClass(MaxTemp.Myreducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//输出文件
FileOutputFormat.setOutputPath(job,new Path("F:\\\\filnk_package\\\\hadoop-2.10.1\\\\data\\\\test3\\\\out"));
//运行job,需要放到Yarn上运行
boolean result =job.waitForCompletion(true);
System.out.print(result?1:0);
以上是关于hadoop之MapReduce的案例(排序最大值)的主要内容,如果未能解决你的问题,请参考以下文章
大数据之Hadoop(MapReduce):WritableComparable排序案例实操(区内排序)
Hadoop中的MapReduce框架原理WritableComparable排序排序分类WritableComparable排序案例实操(全排序)(二次排序)