大数据阶段划分及案例单词统计

Posted xiayangdream

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据阶段划分及案例单词统计相关的知识,希望对你有一定的参考价值。

大数据阶段的重要课程划分

离线分析 : hadoop生态圈 HDFS, MapReduce(概念偏多), hive(底层是MapReduce), 离线业务分析80%都是使用hive

实时分析 : spark

数据结构 : 二叉树(面试) 动态规划, redis数据库, SSM三大框架
	1. spring
	2. springMVC
	3. mybatis

HDFSAPI

HDFS创建目录

@Test
	public void mkdirTest() throws IOException {
		
		//1 获得文件系统的连接
		
		Configuration config=new Configuration();
		
		config.set("fs.defaultFS", "hdfs://10.0.152.47:8020");
		
		FileSystem fs=FileSystem.get(config);
		
		
		//2 操作HDFS API  创建目录 
		fs.mkdirs(new Path("/customMk"));
		
		//3 关闭资源
		
		fs.close();
		
		System.out.println("创建完成");
		
	}

HDFS删除目录

@Test
	public void deleteMk() throws IOException {
		
		//1 获得文件系统的连接
		
		Configuration config=new Configuration();
		
		config.set("fs.defaultFS", "hdfs://10.0.152.47:8020");
		
		FileSystem fs=FileSystem.get(config);
		
		
		//2 操作HDFS API  创建目录 
		fs.delete(new Path("/customMk"),true);
		//3 关闭资源
		
		fs.close();
		
		System.out.println("删除完成");
		
	}

HDFS修改文件名

	@Test
	public void testRename() throws IOException {
		
		//1 获得文件系统的连接
		
		Configuration config=new Configuration();
		
		config.set("fs.defaultFS", "hdfs://10.0.152.47:8020");
		
		FileSystem fs=FileSystem.get(config);
		
		
		//2 操作HDFS API  修改名称
		fs.rename(new Path("/a.txt"), new Path("/d.txt"));
		//3 关闭资源
		
		fs.close();
		
		System.out.println("修改完成");
		
	}

获得文件详细信息

@Test
	public void getFileInfo() throws IOException {
		
		//1 获得文件系统的连接
		
		Configuration config=new Configuration();
		
		config.set("fs.defaultFS", "hdfs://10.0.152.47:8020");
		
		FileSystem fs=FileSystem.get(config);
		
		/*
		 * 
		 *  while(String str:strs){
		 *  	
		 *  	System.out.println(str);
		 *  }
		 * 
		 * */
	    
		 RemoteIterator<LocatedFileStatus> listfiles=fs.listFiles(new Path("/"), true);
		 
		 while(listfiles.hasNext()) {
			 
			 LocatedFileStatus status=listfiles.next();
			 //输出文件名称
			 System.out.println(status.getPath().getName());
			 //输出文件块
			 System.out.println(status.getBlockSize());
			 
			 //文件的长度
			 System.out.println(status.getLen());
			 
			 //文件的权限
			 System.out.println(status.getPermission());
			 
			 //文件的副本数量
			 System.out.println(status.getReplication());
			 
			 //文件的所属者
			 System.out.println(status.getOwner());
		 }
		
		
	}

hadoop环境变量

  1. 解压hadoop2.7.2到一个非中文的路径下
  2. 为hadoop设置环境变量** HADOOP_HOME**
  3. 配置path路径(不详细介绍,网上很多)%HADOOP_HOME%/bin
  4. 把编译过后的本地库文件拷贝到hadoop目录下的bin文件夹下

通过IO流操作HDFS

字符流和字节流

  • 字节流: 字节流什么都能读 , 字节, 图片 , 音乐, 视频, 文件
  • 字符流: 只能读取文本
  • 字节流直接操作的是文件的本身, 字符流操作之中还存在缓冲区, 使用普通字节流处理中文的时候想要读取很多行可能出现乱码
  • 使用字节流操作文本的时候, 如果不关闭资源, 同样可以把内容输出到文本中, 但是使用字符流操作文本的话, 如果没有关闭资源, 或者没有清空缓存区. 内容时不会输出到文本中的

IO流上传

@Test
	public void filePut() throws IllegalArgumentException, IOException {
		
		//1 获得文件系统
		Configuration config=new Configuration();
		
		config.set("fs.defaultFS", "hdfs://10.0.152.47:8020");
		
		FileSystem fs=FileSystem.get(config);
		
		// 2创建输入流
		
		FileInputStream  input=new FileInputStream(new File("c:/hello.txt"));
		
		// 3获得输出流
		
		FSDataOutputStream  fos=fs.create(new Path("/hello21.txt"));
		
		
		// 流拷贝 把input中的内容交给 output输出
		IOUtils.copyBytes(input, fos, config);
		
		
		// 5关闭资源 
		
		IOUtils.closeStream(input);
		
		IOUtils.closeStream(fos);

	}

IO流下载

@Test
	public void getFile() throws IllegalArgumentException, IOException {
		
		//1 获得文件系统
		Configuration config=new Configuration();
		
		config.set("fs.defaultFS", "hdfs://10.0.152.47:8020");
		
		FileSystem fs=FileSystem.get(config);
		
		//2 获得输入流,从HDFS读取 
		
		FSDataInputStream input=fs.open(new Path("/hello21.txt"));
		
		//3 获得输出流 
		
		FileOutputStream output=new FileOutputStream(new File("c:/hello21.txt"));
		
		
		//4 流拷贝
		
		IOUtils.copyBytes(input, output, config);
		
		IOUtils.closeStream(input);
		IOUtils.closeStream(output);

	}

MapReduce核心思想

  • MapReduce优点:
    • 编写分布式程序简单, 可以简单的实现两个接口, 皆可以开发出一套分布式计 算程序
    • 由于是在hadoop上运行的, 可以很方便的横向扩展
  • MapReduce缺点:
    • 延迟高, 不能实时进行分析处理, 只能做离线分析
  • MapReduce分阶段处理:
    • 文本文件 --> inputFormat --> map --> shuffle --> reduce --> outputFormat

WordCount单词统计案例

map阶段
对数据进行切片, 每一个切片对应一个mapTask , 假如一个文件被切成了10个切片, 就存在10个mapTask任务并行运行, 互不干扰
reduce阶段
把每个mapTask阶段的输出进行整合

技术分享图片

步骤:

  1. 文本 : inputFormat 输出到 map<K,V>

    java python hadoop

    hdfs html css

    javascript scala

    scala css hdfs

  2. FileInputFormat 会读取一行文本输出给map 切分为 <>k ,v > 对, key 是文本中的偏移量, V 是文本中的内容 <0, java python hadoop>

    <1, hdfs html css >

    <2, javascript scala > ... <10 , scala css hdfs >

    String word = value.toString(); ---->java python hadoop String[ ] words = word.split(" ");----> [java , python , hadoop]

  3. 接下来 , map 要输出给reduce <K, V > K:文本的内容, V: 单词出现的次数

  4. Reduce进行接收 <K, V > K:文本的内容, V: 单词出现的次数

在整个MapReduce运行的时候存在如下进程: 1. mapTask 2. reduceTask 3. MRAPPMaster 任务管理的进程

hadoop的序列化

如果要进行对象的传输, 则传输的内容必须进行序列化, 所以hadoop就创建了一些序列化类型

对象序列化
Long longWritable
Int IntWrieable
String Text

Wordcount代码

本地运行

1. 编写WordCountMap
package org.qianfeng.wordcount;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * 
 * @author wubo
 * 		
 *Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
 *
 *输入 key 文本中偏移量 
 *value 文本中的内容
 *
 *输出 key 是文本的内容 
 *
 *value 是单词出现的次数 
 */
public class WordCountMap extends Mapper<LongWritable, Text, Text, IntWritable>{

	private Text k=new Text();
	
	@Override
	protected void map(LongWritable key, Text value,Context context)
			throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		
		//1 获取一行的数据 
		
		String line=value.toString();
		
		//2 切割  按照空格切分
		
		String[] words=line.split(" ");
		
		for(String word:words) {
			
			k.set(word);   //把String类型的word 转换为Text类型
			//3 输出到Reduce 
			context.write(k, new IntWritable(1));
		}
	}
	//需要实现Map方法编写业务逻辑
}
2. 编写WordCountReduce
package org.qianfeng.wordcount;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * 
 * @author wubo
 *
 *
 *hello 1
 *hadoop 1
 *
 *hadoop 1
 *
 *hadoop 2
 *
 *把相同key的values进行累加  
 */

public class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable>{

	@Override
	protected void reduce(Text key, Iterable<IntWritable> values,
			Context context) throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		
		int sum=0; 
		
		for(IntWritable count:values) {
			
			sum+=count.get();
			
		}
		
		//输出
		context.write(key, new IntWritable(sum));
		
	}
  
}

3. 编写Driver
package org.qianfeng.wordcount;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Driver {

	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		
		
		//1 获得配置信息 
		
		Configuration config=new Configuration();
		// 实例化 job并且把配置信息传给job
		Job job=Job.getInstance(config);
		
		// 通过反射机制 加载主类的位置
		job.setJarByClass(Driver.class);
		
		//设置map和reduce类
		job.setMapperClass(WordCountMap.class);
		job.setReducerClass(WordCountReduce.class);
		
		
		//设置map的输出 
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		
		
		
		//设置redue的输出
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		//设置文件的输入 输出路径
		
		FileInputFormat.setInputPaths(job, new Path(args[0]]));
		
		FileOutputFormat.setOutputPath(job, new Path(args[1]]));
		
		//提交任务 
		
		boolean result=job.waitForCompletion(true);
		
		System.exit(result?0:1);
		
	}
}

打成jar包在集群上运行

1. 导出jar包

技术分享图片

技术分享图片

技术分享图片

技术分享图片

技术分享图片

2. 上传jar包到Linux 在集群中运行
  • 上传的方法很多, 这里不说
  • 在Linux中启动hadoop
  • 上传文件到HDFS
$HADOOP_HOME/bin/hdfs dfs -put a.txt /a.txt
  • 输入如下命令
$HADOOP_HOME/bin/hadoop jar wc.jar /a.txt /output 
  • 打开文件查看结果
$HADOOP_HOME/bin/hdfs dfs -cat /output/pa...

以上是关于大数据阶段划分及案例单词统计的主要内容,如果未能解决你的问题,请参考以下文章

大数据Flink进阶:Flink入门案例

大数据学习之MapReduce编程案例一单词计数 10

大数据技术之WordCount案例

大数据入门基础七:流量统计案例--流量汇总求和!(项目三)

如何在C++中 统计多行文本中的行数、单词数及字符数

大数据学习笔记—MapReduce