大数据体系与技术hadoop实验

Posted 就是木子呀

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据体系与技术hadoop实验相关的知识,希望对你有一定的参考价值。

社交网站上百万注册用户,网站服务器保留了用户登录网站的日志记录,即登录一次网站,日志文件就记录一次用户邮件地址,现有某一天的原始数据文件共800万行记录。

    运营商要求定期货的用户在某天登录次数信息,作为用户行为分析基础数据,以制定有效的运营计划。

数据清洗后提供用户名称和访问日期

  1. 编程实现按日期统计访问次数,要求获取每个自然日为单位的所有用户访问次数。

mapper文件

package test;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;


public class WordCountMapper extends Mapper<LongWritable,Text, Text,LongWritable> 


    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 
        Text text = new Text();
        LongWritable longWritable = new LongWritable();
        //1.将一行的文本数据进行拆分
        String[] split = value.toString().split(",");
        //2.遍历数组,组装K2,V2
            //3.将K2,V2写入上下文中
            //context.write(new Text(s), new LongWritable(1));
            text.set(split[1]);
            longWritable.set(1);
            context.write(text,longWritable);
        

    

 reduce文件

package test;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WordCountReducer extends Reducer<Text, LongWritable,Text,LongWritable> 

    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException 
        long count=0;
        //1.遍历集合,将集合中数字相加,得到V3
        for (LongWritable value : values) 
            count+=value.get();
        
        //2.将K2和V3写入上下文中
        context.write(key,new LongWritable(count));
    

MainJob文件

package test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class JobMain extends Configured implements Tool 
    //该方法用于指定一个job任务,从提交到结果保存的整个任务
    public int run(String[] args) throws Exception 
        //1.创建一个job任务对象
        Job job = Job.getInstance(super.getConf(), "WordConut");

        //2.配置job任务对象(八个步骤)

        //第一步:指定文件读取方式和读取路径
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job,new Path("hdfs://localhost:9020/input"));
       // TextInputFormat.addInputPath(job,new Path("file:///Users/mac/Desktop/input"));
        //第二步:指定Map阶段的处理方式和数据类型
        job.setMapperClass(WordCountMapper.class);
        job.setMapOutputKeyClass(Text.class);//设置Map阶段K2的类型
        job.setMapOutputValueClass(LongWritable.class);//设置Map阶段V2的类型
        //第三,四,五,六步   采用默认的shuffle阶段处理
        //第七步:指定ruduce阶段的处理方式和数据类型
        job.setReducerClass(WordCountReducer.class);
        job.setOutputKeyClass(Text.class);//设置Reduce阶段K2的类型
        job.setOutputValueClass(LongWritable.class);//设置Reduce阶段V2的类型
        //第八步:设置输出类型和输出路径
        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job,new Path("hdfs://localhost:9020/output"));
        //TextOutputFormat.setOutputPath(job,new Path("file:///Users//mac//Desktop//output"));
        //等待任务结束
        boolean b = job.waitForCompletion(true);

        return b? 0 : 1;
    

    public static void main(String[] args) throws Exception 
        Configuration configuration = new Configuration();
        //启动job任务
        int run = ToolRunner.run(configuration, new JobMain(), args);
        System.exit(run);
    

  1. (1)的基础上编程实现按访问次数排序。

因为mapreduce会默认对输出的key值做快速排序所以我们只需要在(1)的基础上将输出的value作为key即可完成排序

mapper文件

package mapreduce02;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;


public class WordCountMapper extends Mapper<LongWritable,Text,LongWritable,Text> 


    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 
        Text text = new Text();
        LongWritable longWritable = new LongWritable();
        //1.将一行的文本数据进行拆分
        String[] split = value.toString().split("\\t");
        //2.遍历数组,组装K2,V2
            //3.将K2,V2写入上下文中
            text.set(split[0]);
            longWritable.set(Long.parseLong(split[1]));
            context.write(longWritable,text);
    

reduce文件

package mapreduce02;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WordCountReducer extends Reducer<LongWritable,Text,Text,LongWritable> 

    @Override
    protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException 
        //1.遍历集合
        for (Text value : values) 
            //2.将K2和V3写入上下文中
            context.write(value,key);
        

    

JobMain文件

package mapreduce02;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class JobMain extends Configured implements Tool 
    //该方法用于指定一个job任务,从提交到结果保存的整个任务
    public int run(String[] args) throws Exception 
        //1.创建一个job任务对象
        Job job = Job.getInstance(super.getConf(), "WordConut");

        //2.配置job任务对象(八个步骤)

        //第一步:指定文件读取方式和读取路径
        job.setInputFormatClass(TextInputFormat.class);
       TextInputFormat.addInputPath(job,new Path("hdfs://localhost:9020/input"));
       //TextInputFormat.addInputPath(job,new Path("file:///Users/mac/Desktop/input"));
        //第二步:指定Map阶段的处理方式和数据类型
        job.setMapperClass(WordCountMapper.class);
        job.setMapOutputKeyClass(LongWritable.class);//设置Map阶段K2的类型
        job.setMapOutputValueClass(Text.class);//设置Map阶段V2的类型
        //第三,四,五,六步   采用默认的shuffle阶段处理
        //第七步:指定ruduce阶段的处理方式和数据类型
        job.setReducerClass(WordCountReducer.class);
        job.setOutputKeyClass(LongWritable.class);//设置Reduce阶段K2的类型
        job.setOutputValueClass(Text.class);//设置Reduce阶段V2的类型
        //第八步:设置输出类型和输出路径
        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job,new Path("hdfs://localhost:9020/output"));
       //TextOutputFormat.setOutputPath(job,new Path("file:///Users//mac//Desktop//output"));
        //等待任务结束
        boolean b = job.waitForCompletion(true);

        return b? 0 : 1;
    

    public static void main(String[] args) throws Exception 
        Configuration configuration = new Configuration();
        //启动job任务
        int run = ToolRunner.run(configuration, new JobMain(), args);
        System.exit(run);
    
  1. 编程实现按月输出保存数据,要求最终的输出结果根据月份分别保存到两个不同的文件中(2016年1月和2016年2月)。同时要求输出1、2月份的记录数。

mapper文件和reduce文件和JobMain文件:

mapper文件:
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WordCountMapper extends Mapper<LongWritable, Text, Text, Text> 	
	protected void map(LongWritable key,Text value,Context ctx)
	throws IOException, InterruptedException
		String array[] = value.toString().split(",");
		Text key_date = new Text();
		if(array[1].contains("2016-01"))
			key_date = new Text("2016-01");
			ctx.write(key_date,new Text(array[0]));
		
		if(array[1].contains("2016-02"))
			key_date = new Text("2016-02");
			ctx.write(key_date,new Text(array[0]));
			
	

reduce文件

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import java.io.IOException;
public class WordCountReducer extends Reducer<Text, Text, Text, LongWritable> 

	private MultipleOutputs<Text, LongWritable> multipleOutputs;
	
	protected void setup(Context context)
		multipleOutputs = new MultipleOutputs<Text, LongWritable>(context);
	
	
	 protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException 
		 long sum=0L;
		 for(@SuppressWarnings("unused") Text v:values)
			 sum++;
		 
		 multipleOutputs.write(key, new LongWritable(sum), key.toString());
	 
	 
		protected void cleanup(Context context) throws IOException, InterruptedException
			multipleOutputs.close();
		

JobMain文件

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class JobMain extends Configured implements Tool 
    //该方法用于指定一个job任务,从提交到结果保存的整个任务
    public int run(String[] args) throws Exception 
        //1.创建一个job任务对象
        Job job = Job.getInstance(super.getConf(), "WordConut");

        //2.配置job任务对象(八个步骤)

        //第一步:指定文件读取方式和读取路径
        job.setInputFormatClass(TextInputFormat.class);
       TextInputFormat.addInputPath(job,new Path("hdfs://localhost:9020/input"));
       //TextInputFormat.addInputPath(job,new Path("file:///home/hadoop/Desktop/data"));
        //第二步:指定Map阶段的处理方式和数据类型
        job.setMapperClass(WordCountMapper.class);
        job.setMapOutputKeyClass(Text.class);//设置Map阶段K2的类型
        job.setMapOutputValueClass(Text.class);//设置Map阶段V2的类型
        //第三,四,五,六步   采用默认的shuffle阶段处理
        //第七步:指定ruduce阶段的处理方式和数据类型
        job.setReducerClass(WordCountReducer.class);
        job.setOutputKeyClass(Text.class);//设置Reduce阶段K2的类型
        job.setOutputValueClass(Text.class);//设置Reduce阶段V2的类型
        //第八步:设置输出类型和输出路径
        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job,new Path("hdfs://localhost:9020/output"));
       //TextOutputFormat.setOutputPath(job,new Path("file:///home/hadoop/Desktop/output"));
        //等待任务结束
        boolean b = job.waitForCompletion(true);

        return b? 0 : 1;
    

    public static void main(String[] args) throws Exception 
        Configuration configuration = new Configuration();
        //启动job任务
        int run = ToolRunner.run(configuration, new JobMain(), args);
        System.exit(run);
    

2. 编程实现学生各科平均成绩计算。

zhangsan    88lisi    99wangwu    66zhaoliu    77zhangsan    78lisi    89wangwu    96zhaoliu    67zhangsan    80lisi    82wangwu    84zhaoliu    86

mapper文件


import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;


public class WordCountMapper extends Mapper<Object,Text,Text,LongWritable> 


    @Override
    protected void map(Object key, Text value, Context context) throws IOException, InterruptedException 
        Text text = new Text();
        LongWritable longWritable = new LongWritable();
        //1.将一行的文本数据进行拆分
        String[] split = value.toString().split(",");
        //2.遍历数组,组装K2,V2
            //3.将K2,V2写入上下文中
            text.set(split[0]);
            longWritable.set(Long.parseLong(split[1]));
            context.write(text,longWritable);
        

    

 reduce文件


import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WordCountReducer extends Reducer<Text,LongWritable,Text,LongWritable> 

    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException 
        //1.遍历集合
    	Long n=0L;
    	Long sum=0L;
    	LongWritable temp=new LongWritable();
        for (LongWritable value : values) 
            n++;
            sum+=value.get();
            
        
        temp.set(sum/n);
        context.write(key, temp);

    


MainJob文件


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class JobMain extends Configured implements Tool 
    //该方法用于指定一个job任务,从提交到结果保存的整个任务
    public int run(String[] args) throws Exception 
        //1.创建一个job任务对象
        Job job = Job.getInstance(super.getConf(), "WordConut");

        //2.配置job任务对象(八个步骤)

        //第一步:指定文件读取方式和读取路径
        job.setInputFormatClass(TextInputFormat.class);
       TextInputFormat.addInputPath(job,new Path("hdfs://localhost:9020/input"));
       //TextInputFormat.addInputPath(job,new Path("file:///home/hadoop/Desktop/data"));
        //第二步:指定Map阶段的处理方式和数据类型
        job.setMapperClass(WordCountMapper.class);
        job.setMapOutputKeyClass(Text.class);//设置Map阶段K2的类型
        job.setMapOutputValueClass(LongWritable.class);//设置Map阶段V2的类型
        //第三,四,五,六步   采用默认的shuffle阶段处理
        //第七步:指定ruduce阶段的处理方式和数据类型
        job.setReducerClass(WordCountReducer.class);
        job.setOutputKeyClass(Text.class);//设置Reduce阶段K2的类型
        job.setOutputValueClass(LongWritable.class);//设置Reduce阶段V2的类型
        //第八步:设置输出类型和输出路径
        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job,new Path("hdfs://localhost:9020/output"));
       //TextOutputFormat.setOutputPath(job,new Path("file:///home/hadoop/Desktop/output"));
        //等待任务结束
        boolean b = job.waitForCompletion(true);

        return b? 0 : 1;
    

    public static void main(String[] args) throws Exception 
        Configuration configuration = new Configuration();
        //启动job任务
        int run = ToolRunner.run(configuration, new JobMain(), args);
        System.exit(run);
    


3. 统计拨打公共服务号码的电话信息

13718855152 11218610117315 11089451849 11213718855153 11013718855154 11218610117315 11418910117315 114

mapper文件

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WordCountMapper extends Mapper<Object,Text,Text,Text> 

    @Override
    protected void map(Object key, Text value, Context context) throws IOException, InterruptedException 
        Text text1= new Text();
        Text text2=new Text();
        //1.将一行的文本数据进行拆分
        String[] split = value.toString().split(" ");
        //2.遍历数组,组装K2,V2
            //3.将K2,V2写入上下文中
            text1.set(split[0]); //gerendianhua
            text2.set(split[1]); //gonggongdianhua
            context.write(text2,text1);   

    

reduce文件


import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WordCountReducer extends Reducer<Text,Text,Text,Text> 

    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException 
        //1.遍历集合
    	String result="";
        for (Text value : values) 
        	result+=(value.toString()+" ");
        
        context.write(key, new Text(result));

    

JobMain文件


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class JobMain extends Configured implements Tool 
    //该方法用于指定一个job任务,从提交到结果保存的整个任务
    public int run(String[] args) throws Exception 
        //1.创建一个job任务对象
        Job job = Job.getInstance(super.getConf(), "WordConut");

        //2.配置job任务对象(八个步骤)

        //第一步:指定文件读取方式和读取路径
        job.setInputFormatClass(TextInputFormat.class);
       TextInputFormat.addInputPath(job,new Path("hdfs://localhost:9020/input"));
       //TextInputFormat.addInputPath(job,new Path("file:///home/hadoop/Desktop/data"));
        //第二步:指定Map阶段的处理方式和数据类型
        job.setMapperClass(WordCountMapper.class);
        job.setMapOutputKeyClass(Text.class);//设置Map阶段K2的类型
        job.setMapOutputValueClass(Text.class);//设置Map阶段V2的类型
        //第三,四,五,六步   采用默认的shuffle阶段处理
        //第七步:指定ruduce阶段的处理方式和数据类型
        job.setReducerClass(WordCountReducer.class);
        job.setOutputKeyClass(Text.class);//设置Reduce阶段K2的类型
        job.setOutputValueClass(Text.class);//设置Reduce阶段V2的类型
        //第八步:设置输出类型和输出路径
        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job,new Path("hdfs://localhost:9020/output"));
       //TextOutputFormat.setOutputPath(job,new Path("file:///home/hadoop/Desktop/output"));
        //等待任务结束
        boolean b = job.waitForCompletion(true);

        return b? 0 : 1;
    

    public static void main(String[] args) throws Exception 
        Configuration configuration = new Configuration();
        //启动job任务
        int run = ToolRunner.run(configuration, new JobMain(), args);
        System.exit(run);
    


以上是关于大数据体系与技术hadoop实验的主要内容,如果未能解决你的问题,请参考以下文章

Hadoop学习系列(2.Hadoop框架介绍与搜索技术体系介绍)

大数据应用技术课程实践--选题与实践方案

Hadoop——Hadoop优势组成大数据技术生态体系系统框架图

大数据 | 白话最主流的大数据技术体系Hadoop架构原理

大数据应用技术课程实践--选题与实践方案

大数据生态技术体系都有哪些?