2018-07-25期 MapReduce求部门工资总和及平均工资

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了2018-07-25期 MapReduce求部门工资总和及平均工资相关的知识,希望对你有一定的参考价值。

1、Mapper类

package cn.sjq.bigdata.mr.salary;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

/**

* 求部门员工薪资总和,部门平均薪资 Map类

* 数据格式:

* SEQ EMPLOYEE_NAME COMPANY_NAME DEPT_NAME SUB_TOTAL COM_FUND_PAYAMT

  1 张三 xx有限公司 客户服务中心 5887 798

* @author songjq

*

*/

public class DeptSalaryMapper extends Mapper<LongWritable, Text, Text, Text> {

private Text tkey = new Text();

private Text tvalue = new Text();

/*

* 实现Mapper类map方法

* (non-Javadoc)

* @see org.apache.hadoop.mapreduce.Mapper#map(KEYIN, VALUEIN, org.apache.hadoop.mapreduce.Mapper.Context)

*/

@Override

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

//获取一行

/ 1 张三 xx有限公司 客户服务中心 5887 798

String line = value.toString();

//分词操作

String[] fileds = StringUtils.split(line, ",");

//将数据通过context传送到reduce

//设置部门作为key

tkey.set(fileds[3]);

//设置薪资及公积金作为value

tvalue.set(fileds[4]+","+fileds[5]);

context.write(tkey, tvalue);

}

}

2、Reducer类

package cn.sjq.bigdata.mr.salary;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;

import org.apache.hadoop.io.DoubleWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

/**

* 求部门员工薪资总和,部门平均薪资 Reduce类

* @author songjq

*

*/

public class DeptSalaryReducer extends Reducer<Text, Text, Text, Text> {

private Text tvalue = new Text();

/*

* 实现reducer类的reduce方法

* (non-Javadoc)

* @see org.apache.hadoop.mapreduce.Reducer#reduce(KEYIN, java.lang.Iterable, org.apache.hadoop.mapreduce.Reducer.Context)

*/

@Override

protected void reduce(Text key, Iterable<Text> values, Context ctx)

throws IOException, InterruptedException {

//部门总薪资

float total_sal = 0;

//部门平均薪资

float avg_sal = 0;

//部门总公积金

float tatal_fond = 0;

//部门平均公积金

float avg_fond = 0;

int count = 0;

/*

* 迭代处理values的集合

*/

for(Text v:values) {

//薪资及公积金 [5887,798]

String[] salAry = StringUtils.split(v.toString(), ",");

total_sal+=new Float(salAry[0]).floatValue();

tatal_fond+=new Float(salAry[1]).floatValue();

count++;

}

//求平均工资

avg_sal = total_sal/count;

//求平均公积金

avg_fond = tatal_fond/count;

tvalue.set(","+total_sal+","+avg_sal+","+tatal_fond+","+avg_fond);

ctx.write(key, tvalue);

}

}

3、提交Job主类

package cn.sjq.bigdata.mr.salary;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**

* 求部门员工薪资总和,部门平均薪资 Main方法,提交job

* @author songjq

*

*/

public class DeptSalaryMain {

public static void main(String[] args) throws Exception {

//获取一个job实例

Job job = Job.getInstance(new Configuration());

//设置Job入口

job.setJarByClass(DeptSalaryMain.class);

//设置job Mapper类

job.setMapperClass(DeptSalaryMapper.class);

//设置job Reducer类

job.setReducerClass(DeptSalaryReducer.class);

/*

* 设置输出数据类型

*/

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(Text.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(Text.class);

//设置输入输出路径

FileInputFormat.setInputPaths(job, new Path("D:\test\salary\salary.csv"));

FileOutputFormat.setOutputPath(job, new Path("D:\test\salary\output"));

//提交job

job.waitForCompletion(true);

}

}


以上是关于2018-07-25期 MapReduce求部门工资总和及平均工资的主要内容,如果未能解决你的问题,请参考以下文章

MapReduce(eclipse)求2020年部门工资平均值,求2021年员工平均工资

2018-07-25期 Java序列化和反序列化编程小案例

大数据笔记——Mapreduce的高级特性(A)

MapReduce应用实例

2018-07-28期 MapReduce实现对数字排序

2018-07-30期 MapReduce对象排序(单列排序)