大数据体系与技术hadoop实验
Posted 就是木子呀
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据体系与技术hadoop实验相关的知识,希望对你有一定的参考价值。
社交网站上百万注册用户,网站服务器保留了用户登录网站的日志记录,即登录一次网站,日志文件就记录一次用户邮件地址,现有某一天的原始数据文件共800万行记录。
运营商要求定期货的用户在某天登录次数信息,作为用户行为分析基础数据,以制定有效的运营计划。
数据清洗后提供用户名称和访问日期。
- 编程实现按日期统计访问次数,要求获取每个自然日为单位的所有用户访问次数。
mapper文件
package test;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WordCountMapper extends Mapper<LongWritable,Text, Text,LongWritable>
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
Text text = new Text();
LongWritable longWritable = new LongWritable();
//1.将一行的文本数据进行拆分
String[] split = value.toString().split(",");
//2.遍历数组,组装K2,V2
//3.将K2,V2写入上下文中
//context.write(new Text(s), new LongWritable(1));
text.set(split[1]);
longWritable.set(1);
context.write(text,longWritable);
reduce文件
package test;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WordCountReducer extends Reducer<Text, LongWritable,Text,LongWritable>
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException
long count=0;
//1.遍历集合,将集合中数字相加,得到V3
for (LongWritable value : values)
count+=value.get();
//2.将K2和V3写入上下文中
context.write(key,new LongWritable(count));
MainJob文件
package test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class JobMain extends Configured implements Tool
//该方法用于指定一个job任务,从提交到结果保存的整个任务
public int run(String[] args) throws Exception
//1.创建一个job任务对象
Job job = Job.getInstance(super.getConf(), "WordConut");
//2.配置job任务对象(八个步骤)
//第一步:指定文件读取方式和读取路径
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("hdfs://localhost:9020/input"));
// TextInputFormat.addInputPath(job,new Path("file:///Users/mac/Desktop/input"));
//第二步:指定Map阶段的处理方式和数据类型
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class);//设置Map阶段K2的类型
job.setMapOutputValueClass(LongWritable.class);//设置Map阶段V2的类型
//第三,四,五,六步 采用默认的shuffle阶段处理
//第七步:指定ruduce阶段的处理方式和数据类型
job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class);//设置Reduce阶段K2的类型
job.setOutputValueClass(LongWritable.class);//设置Reduce阶段V2的类型
//第八步:设置输出类型和输出路径
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("hdfs://localhost:9020/output"));
//TextOutputFormat.setOutputPath(job,new Path("file:///Users//mac//Desktop//output"));
//等待任务结束
boolean b = job.waitForCompletion(true);
return b? 0 : 1;
public static void main(String[] args) throws Exception
Configuration configuration = new Configuration();
//启动job任务
int run = ToolRunner.run(configuration, new JobMain(), args);
System.exit(run);
- 在(1)的基础上编程实现按访问次数排序。
因为mapreduce会默认对输出的key值做快速排序,所以我们只需要在(1)的基础上将输出的value作为key,即可完成排序。
mapper文件
package mapreduce02;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WordCountMapper extends Mapper<LongWritable,Text,LongWritable,Text>
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
Text text = new Text();
LongWritable longWritable = new LongWritable();
//1.将一行的文本数据进行拆分
String[] split = value.toString().split("\\t");
//2.遍历数组,组装K2,V2
//3.将K2,V2写入上下文中
text.set(split[0]);
longWritable.set(Long.parseLong(split[1]));
context.write(longWritable,text);
reduce文件
package mapreduce02;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WordCountReducer extends Reducer<LongWritable,Text,Text,LongWritable>
@Override
protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException
//1.遍历集合
for (Text value : values)
//2.将K2和V3写入上下文中
context.write(value,key);
JobMain文件
package mapreduce02;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class JobMain extends Configured implements Tool
//该方法用于指定一个job任务,从提交到结果保存的整个任务
public int run(String[] args) throws Exception
//1.创建一个job任务对象
Job job = Job.getInstance(super.getConf(), "WordConut");
//2.配置job任务对象(八个步骤)
//第一步:指定文件读取方式和读取路径
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("hdfs://localhost:9020/input"));
//TextInputFormat.addInputPath(job,new Path("file:///Users/mac/Desktop/input"));
//第二步:指定Map阶段的处理方式和数据类型
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(LongWritable.class);//设置Map阶段K2的类型
job.setMapOutputValueClass(Text.class);//设置Map阶段V2的类型
//第三,四,五,六步 采用默认的shuffle阶段处理
//第七步:指定ruduce阶段的处理方式和数据类型
job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(LongWritable.class);//设置Reduce阶段K2的类型
job.setOutputValueClass(Text.class);//设置Reduce阶段V2的类型
//第八步:设置输出类型和输出路径
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("hdfs://localhost:9020/output"));
//TextOutputFormat.setOutputPath(job,new Path("file:///Users//mac//Desktop//output"));
//等待任务结束
boolean b = job.waitForCompletion(true);
return b? 0 : 1;
public static void main(String[] args) throws Exception
Configuration configuration = new Configuration();
//启动job任务
int run = ToolRunner.run(configuration, new JobMain(), args);
System.exit(run);
- 编程实现按月输出保存数据,要求最终的输出结果根据月份分别保存到两个不同的文件中(2016年1月和2016年2月)。同时要求输出1、2月份的记录数。
mapper文件和reduce文件和JobMain文件:
mapper文件:
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WordCountMapper extends Mapper<LongWritable, Text, Text, Text>
protected void map(LongWritable key,Text value,Context ctx)
throws IOException, InterruptedException
String array[] = value.toString().split(",");
Text key_date = new Text();
if(array[1].contains("2016-01"))
key_date = new Text("2016-01");
ctx.write(key_date,new Text(array[0]));
if(array[1].contains("2016-02"))
key_date = new Text("2016-02");
ctx.write(key_date,new Text(array[0]));
reduce文件
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import java.io.IOException;
public class WordCountReducer extends Reducer<Text, Text, Text, LongWritable>
private MultipleOutputs<Text, LongWritable> multipleOutputs;
protected void setup(Context context)
multipleOutputs = new MultipleOutputs<Text, LongWritable>(context);
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException
long sum=0L;
for(@SuppressWarnings("unused") Text v:values)
sum++;
multipleOutputs.write(key, new LongWritable(sum), key.toString());
protected void cleanup(Context context) throws IOException, InterruptedException
multipleOutputs.close();
JobMain文件
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class JobMain extends Configured implements Tool
//该方法用于指定一个job任务,从提交到结果保存的整个任务
public int run(String[] args) throws Exception
//1.创建一个job任务对象
Job job = Job.getInstance(super.getConf(), "WordConut");
//2.配置job任务对象(八个步骤)
//第一步:指定文件读取方式和读取路径
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("hdfs://localhost:9020/input"));
//TextInputFormat.addInputPath(job,new Path("file:///home/hadoop/Desktop/data"));
//第二步:指定Map阶段的处理方式和数据类型
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class);//设置Map阶段K2的类型
job.setMapOutputValueClass(Text.class);//设置Map阶段V2的类型
//第三,四,五,六步 采用默认的shuffle阶段处理
//第七步:指定ruduce阶段的处理方式和数据类型
job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class);//设置Reduce阶段K2的类型
job.setOutputValueClass(Text.class);//设置Reduce阶段V2的类型
//第八步:设置输出类型和输出路径
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("hdfs://localhost:9020/output"));
//TextOutputFormat.setOutputPath(job,new Path("file:///home/hadoop/Desktop/output"));
//等待任务结束
boolean b = job.waitForCompletion(true);
return b? 0 : 1;
public static void main(String[] args) throws Exception
Configuration configuration = new Configuration();
//启动job任务
int run = ToolRunner.run(configuration, new JobMain(), args);
System.exit(run);
2. 编程实现学生各科平均成绩计算。
zhangsan 88lisi 99wangwu 66zhaoliu 77zhangsan 78lisi 89wangwu 96zhaoliu 67zhangsan 80lisi 82wangwu 84zhaoliu 86
mapper文件
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WordCountMapper extends Mapper<Object,Text,Text,LongWritable>
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException
Text text = new Text();
LongWritable longWritable = new LongWritable();
//1.将一行的文本数据进行拆分
String[] split = value.toString().split(",");
//2.遍历数组,组装K2,V2
//3.将K2,V2写入上下文中
text.set(split[0]);
longWritable.set(Long.parseLong(split[1]));
context.write(text,longWritable);
reduce文件
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WordCountReducer extends Reducer<Text,LongWritable,Text,LongWritable>
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException
//1.遍历集合
Long n=0L;
Long sum=0L;
LongWritable temp=new LongWritable();
for (LongWritable value : values)
n++;
sum+=value.get();
temp.set(sum/n);
context.write(key, temp);
MainJob文件
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class JobMain extends Configured implements Tool
//该方法用于指定一个job任务,从提交到结果保存的整个任务
public int run(String[] args) throws Exception
//1.创建一个job任务对象
Job job = Job.getInstance(super.getConf(), "WordConut");
//2.配置job任务对象(八个步骤)
//第一步:指定文件读取方式和读取路径
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("hdfs://localhost:9020/input"));
//TextInputFormat.addInputPath(job,new Path("file:///home/hadoop/Desktop/data"));
//第二步:指定Map阶段的处理方式和数据类型
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class);//设置Map阶段K2的类型
job.setMapOutputValueClass(LongWritable.class);//设置Map阶段V2的类型
//第三,四,五,六步 采用默认的shuffle阶段处理
//第七步:指定ruduce阶段的处理方式和数据类型
job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class);//设置Reduce阶段K2的类型
job.setOutputValueClass(LongWritable.class);//设置Reduce阶段V2的类型
//第八步:设置输出类型和输出路径
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("hdfs://localhost:9020/output"));
//TextOutputFormat.setOutputPath(job,new Path("file:///home/hadoop/Desktop/output"));
//等待任务结束
boolean b = job.waitForCompletion(true);
return b? 0 : 1;
public static void main(String[] args) throws Exception
Configuration configuration = new Configuration();
//启动job任务
int run = ToolRunner.run(configuration, new JobMain(), args);
System.exit(run);
3. 统计拨打公共服务号码的电话信息
13718855152 11218610117315 11089451849 11213718855153 11013718855154 11218610117315 11418910117315 114
mapper文件
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WordCountMapper extends Mapper<Object,Text,Text,Text>
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException
Text text1= new Text();
Text text2=new Text();
//1.将一行的文本数据进行拆分
String[] split = value.toString().split(" ");
//2.遍历数组,组装K2,V2
//3.将K2,V2写入上下文中
text1.set(split[0]); //gerendianhua
text2.set(split[1]); //gonggongdianhua
context.write(text2,text1);
reduce文件
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WordCountReducer extends Reducer<Text,Text,Text,Text>
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException
//1.遍历集合
String result="";
for (Text value : values)
result+=(value.toString()+" ");
context.write(key, new Text(result));
JobMain文件
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class JobMain extends Configured implements Tool
//该方法用于指定一个job任务,从提交到结果保存的整个任务
public int run(String[] args) throws Exception
//1.创建一个job任务对象
Job job = Job.getInstance(super.getConf(), "WordConut");
//2.配置job任务对象(八个步骤)
//第一步:指定文件读取方式和读取路径
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("hdfs://localhost:9020/input"));
//TextInputFormat.addInputPath(job,new Path("file:///home/hadoop/Desktop/data"));
//第二步:指定Map阶段的处理方式和数据类型
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class);//设置Map阶段K2的类型
job.setMapOutputValueClass(Text.class);//设置Map阶段V2的类型
//第三,四,五,六步 采用默认的shuffle阶段处理
//第七步:指定ruduce阶段的处理方式和数据类型
job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class);//设置Reduce阶段K2的类型
job.setOutputValueClass(Text.class);//设置Reduce阶段V2的类型
//第八步:设置输出类型和输出路径
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("hdfs://localhost:9020/output"));
//TextOutputFormat.setOutputPath(job,new Path("file:///home/hadoop/Desktop/output"));
//等待任务结束
boolean b = job.waitForCompletion(true);
return b? 0 : 1;
public static void main(String[] args) throws Exception
Configuration configuration = new Configuration();
//启动job任务
int run = ToolRunner.run(configuration, new JobMain(), args);
System.exit(run);
以上是关于大数据体系与技术hadoop实验的主要内容,如果未能解决你的问题,请参考以下文章
Hadoop学习系列(2.Hadoop框架介绍与搜索技术体系介绍)