Hadoop 系列自定义数据类
Posted Kotlin
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hadoop 系列自定义数据类相关的知识,希望对你有一定的参考价值。
一:自定义数据类
为什么需要自定义数据类
上一篇文章里面我们自己写了一个mapreduce 里面的Top N的https://www.cnblogs.com/wuxiaolong4/p/12733518.html,我们可以从代码里面看出来,里面用|作分隔符,这种方法不好,会由于你自己的失误导致读取字段出错或者把自己搞晕了。这时候我们就需要自定义类,定义属于自己的实体类,有点像操作数据库里面的entity。
怎么定义自己的数据类
class MinMaxtemperature implements WritableComparable<MinMaxtemperature> {
public int compareTo(MinMaxtemperature otherMinMaxtemperature) {
if(date.equals(otherMinMaxtemperature.getDate())) return 1;
else return 0;
}
public void write(DataOutput out) throws IOException {
out.writeInt(max);
out.writeInt(min);
}
public void readFields(DataInput in) throws IOException {
max = in.readInt();
min = in.readInt();
}
private String date = "";
private int min =0 ;
private int max =0 ;
public String getDate() {
return date;
}
public void setDate(String date) {
this.date = date;
}
public int getMin() {
return min;
}
public void setMin(int min) {
this.min = min;
}
public int getMax() {
return max;
}
public void setMax(int max) {
this.max = max;
}
@Override
public String toString() {
return "MinMaxtemperature{" +
"date=\'" + date + \'\\\'\' +
", min=" + min +
", max=" + max +
\'}\';
}
}
怎么使用自己的类
数据:
2020040112 1 2020040113 3 2020040114 4 2020040115 5 2020040116 6 2020040117 7 2020040118 8 2020040119 9 2020040312 1 2020040313 3 2020040314 4 2020040315 5 2020040316 6 2020040317 7 2020040318 8 2020040319 9 2020040412 1 2020040413 3 2020040414 4 2020040415 5 2020040416 6 2020040417 7 2020040418 8 2020040419 9
代码1 :输出最高温度和最低温度:
package org.example;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
class MinMaxtemperature implements WritableComparable<MinMaxtemperature> {
private String date = "";
private int min =0 ;
private int max =0 ;
public MinMaxtemperature() {
}
public MinMaxtemperature(String date, int max, int min) {
this.date = date;
this.min = min;
this.max = max;
}
public int compareTo(MinMaxtemperature otherMinMaxtemperature) {
if(date.equals(otherMinMaxtemperature.getDate())) return 1;
else return 0;
}
public void write(DataOutput out) throws IOException {
out.writeInt(max);
out.writeInt(min);
}
public void readFields(DataInput in) throws IOException {
max = in.readInt();
min = in.readInt();
}
public String getDate() {
return date;
}
public void setDate(String date) {
this.date = date;
}
public int getMin() {
return min;
}
public void setMin(int min) {
this.min = min;
}
public int getMax() {
return max;
}
public void setMax(int max) {
this.max = max;
}
@Override
public String toString() {
return "MinMaxtemperature{" +
"date=\'" + date + \'\\\'\' +
", min=" + min +
", max=" + max +
\'}\';
}
}
class WordcountMapper extends Mapper<LongWritable, Text, Text, MinMaxtemperature> {
private MinMaxtemperature dataMap = new MinMaxtemperature();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line[] = value.toString().split("\\\\ ");
String date = line[0].substring(0,line[0].length()-2);
int temperature = Integer.parseInt(line[1]);
if(temperature > dataMap.getMax()){
dataMap.setMax(temperature);
}
if(temperature < dataMap.getMin()){
dataMap.setMin(temperature);
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
context.write(new Text(dataMap.getDate()),dataMap);
}
}
class WordcountReducer extends Reducer<Text,MinMaxtemperature,Text,Text> {
private MinMaxtemperature dataMap = new MinMaxtemperature();
@Override
protected void reduce(Text key, Iterable<MinMaxtemperature> values, Context context) throws IOException, InterruptedException {
String date = key.toString();
System.out.println("reduce:"+date);
for (MinMaxtemperature value : values ) {
if(value.getMax() > dataMap.getMax()){
dataMap.setMax(value.getMax());
}
if(value.getMin() < dataMap.getMin()){
dataMap.setMin(value.getMin());
}
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
context.write(new Text("max:min"),new Text(dataMap.getMax()+":"+dataMap.getMin()));
}
}
public class WordcountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "file:///");
FileSystem fs= FileSystem.get(conf);
String outputPath = "/software/java/data/output/";
if(fs.exists(new Path(outputPath))) fs.delete(new Path(outputPath),true);
Job job = Job.getInstance(conf);
job.setJarByClass(WordcountDriver.class);
job.setMapperClass(WordcountMapper.class);
job.setReducerClass(WordcountReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(MinMaxtemperature.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, new Path("/software/java/data/input/"));
FileOutputFormat.setOutputPath(job, new Path(outputPath));
//将job配置的参数,以及job所用的java类所在的jar包提交给yarn去运行
//job.submit();
boolean res = job.waitForCompletion(true);
}
}
代码2 分组内输出最高温度和最低温度:
package org.example;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
class MinMaxtemperature implements WritableComparable<MinMaxtemperature> {
private String date = "";
private int min =0 ;
private int max =0 ;
public MinMaxtemperature() {
}
public MinMaxtemperature(String date, int max, int min) {
this.date = date;
this.min = min;
this.max = max;
}
public int compareTo(MinMaxtemperature otherMinMaxtemperature) {
if(date.equals(otherMinMaxtemperature.getDate())) return 1;
else return 0;
}
public void write(DataOutput out) throws IOException {
out.writeInt(max);
out.writeInt(min);
}
public void readFields(DataInput in) throws IOException {
max = in.readInt();
min = in.readInt();
}
public String getDate() {
return date;
}
public void setDate(String date) {
this.date = date;
}
public int getMin() {
return min;
}
public void setMin(int min) {
this.min = min;
}
public int getMax() {
return max;
}
public void setMax(int max) {
this.max = max;
}
@Override
public String toString() {
return "MinMaxtemperature{" +
"date=\'" + date + \'\\\'\' +
", min=" + min +
", max=" + max +
\'}\';
}
}
class WordcountMapper extends Mapper<LongWritable, Text, Text, MinMaxtemperature> {
private Map<String,MinMaxtemperature> minmaxMap = new HashMap<String,MinMaxtemperature>();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line[] = value.toString().split("\\\\ ");
String date = line[0].substring(0,line[0].length()-2);
int temperature = Integer.parseInt(line[1]);
if(minmaxMap.containsKey(date)){
MinMaxtemperature existsTemperature = minmaxMap.get(date);
if(temperature > existsTemperature.getMax()){
existsTemperature.setMax(temperature);
}
if(temperature < existsTemperature.getMin()){
existsTemperature.setMin(temperature);
}
}
else{
minmaxMap.put(date,new MinMaxtemperature(date,temperature,temperature));
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
for (Map.Entry<String, MinMaxtemperature> data : minmaxMap.entrySet()) {
context.write(new Text(data.getKey()),data.getValue());
}
}
}
class WordcountReducer extends Reducer<Text,MinMaxtemperature,Text,Text> {
private Map<String,MinMaxtemperature> minmaxMap = new HashMap<String,MinMaxtemperature>();
@Override
protected void reduce(Text key, Iterable<MinMaxtemperature> values, Context context) throws IOException, InterruptedException {
String date = key.toString();
System.out.println("reduce:"+date);
for (MinMaxtemperature value : values ) {
if(minmaxMap.containsKey(date)){
MinMaxtemperature exists = minmaxMap.get(date);
if(value.getMax() > exists.getMax()){
exists.setMax(value.getMax());
}
if(value.getMin() < exists.getMin()){
exists.setMin(value.getMin());
}
}
else{
minmaxMap.put(date,value);
}
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
for (Map.Entry<String, MinMaxtemperature> data :minmaxMap.entrySet()) {
context.write(new Text(data.getKey()),new Text(data.getValue().toString()));
}
}
}
public class WordcountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "file:///");
FileSystem fs= FileSystem.get(conf);
String outputPath = "/software/java/data/output/";
if(fs.exists(new Path(outputPath))) fs.delete(new Path(outputPath),true);
Job job = Job.getInstance(conf);
job.setJarByClass(WordcountDriver.class);
job.setMapperClass(WordcountMapper.class);
job.setReducerClass(WordcountReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(MinMaxtemperature.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, new Path("/software/java/data/input/"));
FileOutputFormat.setOutputPath(job, new Path(outputPath));
//将job配置的参数,以及job所用的java类所在的jar包提交给yarn去运行
//job.submit();
boolean res = job.waitForCompletion(true);
}
}
代码3和代码4就不写了,因为差不多。
以上是关于Hadoop 系列自定义数据类的主要内容,如果未能解决你的问题,请参考以下文章
2021年大数据Hadoop(十九):MapReduce分区
大数据之Hadoop(MapReduce):自定义InputFormat
SpringCloud系列十一:SpringCloudStream(SpringCloudStream 简介创建消息生产者创建消息消费者自定义消息通道分组与持久化设置 RoutingKey)(代码片段