大数据小文件问题与企业级解决方案
Posted 赵广陆
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据小文件问题与企业级解决方案相关的知识,希望对你有一定的参考价值。
1 MapReduce性能优化
现在大家已经掌握了MapReduce程序的开发步骤,注意了,针对MapReduce的案例我们并没有讲太多,主要是因为在实际工作中真正需要我们去写MapReduce代码的场景已经是凤毛麟角了,因为后面我们会学习一个大数据框架Hive,Hive支持SQL,这个Hive底层会把SQL转化为MapReduce执行,不需要 我们写一行代码,所以说工作中的大部分需求我们都使用SQL去实现了,谁还苦巴巴的来写代码啊,一行SQL能抵你写的几十行代码,你还想去写MapReduce代码吗,肯定不想了。
但是MapReduce代码的开发毕竟是基本功,所以前面我们也详细的讲解了它的开发流程。
虽然现在MapReduce代码写的很少了,但是针对MapReduce程序的性能优化是少不了的,面试也是经
常会问到的,所以下面我们就来分析一下MapReduce中典型的性能优化场景
第一个场景是:小文件问题
第二个场景是:数据倾斜问题
2 小文件问题
先一个一个来,不要着急,我们先看小文件问题
咱们前面分析过,Hadoop的HDFS和MapReduce都是针对大数据文件来设计的,在小文件的处理上不但
效率低下,而且十分消耗内存资源针对HDFS而言,每一个小文件在namenode中都会占用150字节的内存空间,最终会导致集群中虽然存储了很多个文件,但是文件的体积并不大,这样就没有意义了。
针对MapReduce而言,每一个小文件都是一个Block,都会产生一个InputSplit,最终每一个小文件都会 产生一个map任务,这样会导致同时启动太多的Map任务,Map任务的启动是非常消耗性能的,但是启
动了以后执行了很短时间就停止了,因为小文件的数据量太小了,这样就会造成任务执行消耗的时间还没有启动任务消耗的时间多,这样也会影响MapReduce执行的效率。
针对这个问题,解决办法通常是选择一个容器,将这些小文件组织起来统一存储,HDFS提供了两种类型的容器,分别是SequenceFile 和 MapFileSequeceFile是Hadoop 提供的一种二进制文件,这种二进制文件直接将<key, value>对序列化到文件
中。
一般对小文件可以使用这种文件合并,即将小文件的文件名作为key,文件内容作为value序列化到大文
件中但是这个文件有一个缺点,就是它需要一个合并文件的过程,最终合并的文件会比较大,并且合并后的文件查看起来不方便,必须通过遍历才能查看里面的每一个小文件所以这个SequenceFile 其实可以理解为把很多小文件压缩成一个大的压缩包了。
下面我们来具体看一下如何生成SequenceFile
生成SequenceFile需要开发代码
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.DefaultCodec;
import java.io.File;
/**
* 小文件解决方案之SequenceFile
*/
public class SmallFileSeq {
public static void main(String[] args) throws Exception {
//生成SequenceFile文件
write("D:\\\\smallFile", "/seqFile");
//读取SequenceFile文件
read("/seqFile");
}
/**
* 生成SequenceFile文件
*
* @param inputDir 输入目录-windows目录
* @param outputFile 输出文件-hdfs文件
* @throws Exception
*/
private static void write(String inputDir, String outputFile)
throws Exception {
//创建一个配置对象
Configuration conf = new Configuration();
//指定HDFS的地址
conf.set("fs.defaultFS", "hdfs://bigdata01:9000");
//获取操作HDFS的对象
FileSystem fileSystem = FileSystem.get(conf);
//删除输出文件
fileSystem.delete(new Path(outputFile), true);
//构造opts数组,有三个元素
/*
第一个是输出路径
第二个是key类型
第三个是value类型
*/
SequenceFile.Writer.Option[] opts = new SequenceFile.Writer.Option[]{
SequenceFile.Writer.file(new Path(outputFile)),
SequenceFile.Writer.keyClass(Text.class),
SequenceFile.Writer.valueClass(Text.class)};
//创建一个writer实例
SequenceFile.Writer writer = SequenceFile.createWriter(conf, opts);
//指定要压缩的文件的目录
File inputDirPath = new File(inputDir);
if (inputDirPath.isDirectory()) {
File[] files = inputDirPath.listFiles();
for (File file : files) {
//获取文件全部内容
String content = FileUtils.readFileToString(file, "UTF-8");
//文件名作为key
Text key = new Text(file.getName());
//文件内容作为value
Text value = new Text(content);
writer.append(key, value);
}
}
writer.close();
}
*
@param
inputFile SequenceFile文件路径
*@throws Exception
*/
private static void read(String inputFile)
throws Exception {
//创建一个配置对象
Configuration conf = new Configuration();
//指定HDFS的地址
conf.set("fs.defaultFS", "hdfs://bigdata01:9000");
//创建阅读器
SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFi
Text key = new Text();
Text value = new Text();
//循环读取数据
while (reader.next(key, value)) {
//输出文件名称
System.out.print("文件名:" + key.toString() + ",");
//输出文件的内容
System.out.println("文件内容:" + value.toString());
}
reader.close();
}
}
执行代码中的write方法,可以看到在HDFS上会产生一个/seqFile文件,这个文件就是最终生成的大文件
执行代码中的read方法,可以输出小文件的名称和内容
接下来我们来看一下MapFile
MapFile是排序后的SequenceFile,MapFile由两部分组成,分别是index和data
index作为文件的数据索引,主要记录了每个Record的key值,以及该Record在文件中的偏移位置。在
MapFile被访问的时候,索引文件会被加载到内存,通过索引映射关系可迅速定位到指定Record所在文件 位置,因此,相对SequenceFile而言,MapFile的检索效率是高效的,缺点是会消耗一部分内存来存储
index数据。
代码实现如下:
package com.oldlu.mr;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import java.io.File;
/**
* 小文件解决方案之MapFile
*/
public class SmallFileMap {
public static void main(String[] args) throws Exception{
//生成MapFile文件
write("D:\\\\smallFile","/mapFile");
read("/mapFile");
}
/**
* 生成MapFile文件
* @param inputDir 输入目录-windows目录
* @param outputDir 输出目录-hdfs目录
* @throws Exception
*/
private static void write(String inputDir,String outputDir)
throws Exception{
//创建一个配置对象
Configuration conf = new Configuration();
//指定HDFS的地址
conf.set("fs.defaultFS","hdfs://bigdata01:9000");
//获取操作HDFS的对象
FileSystem fileSystem = FileSystem.get(conf);
//删除输出目录
fileSystem.delete(new Path(outputDir),true);
//构造opts数组,有两个元素
/*
第一个是key类型
第二个是value类型
*/
SequenceFile.Writer.Option[] opts = new SequenceFile.Writer.Option[]{
MapFile.Writer.keyClass(Text.class),
MapFile.Writer.valueClass(Text.class)};
//创建一个writer实例
MapFile.Writer writer = new MapFile.Writer(conf,new Path(outputDir),o
//指定要压缩的文件的目录
File inputDirPath = new File(inputDir);
if(inputDirPath.isDirectory()){
File[] files = inputDirPath.listFiles();
for (File file : files) {
//获取文件全部内容
String content = FileUtils.readFileToString(file, "UTF-8");
//文件名作为key
Text key = new Text(file.getName());
//文件内容作为value
Text value = new Text(content);
writer.append(key,value);
}
}
writer.close();
}
/**
* 读取MapFile文件
* @param inputDir MapFile文件路径
* @throws Exception
*/
private static void read(String inputDir)
throws Exception{
//创建一个配置对象
Configuration conf = new Configuration();
//指定HDFS的地址
conf.set("fs.defaultFS","hdfs://bigdata01:9000");
//创建阅读器
MapFile.Reader reader = new MapFile.Reader(new Path(inputDir),conf);
//循环读取数据
while(reader.next(key,value)){
//输出文件名称
System.out.print("文件名:"+key.toString()+",");
//输出文件的内容
System.out.println("文件内容:"+value.toString());
}
reader.close();
}
}
执行代码中的write方法,可以看到在HDFS上会产生一个/mapFile目录,这个目录里面有两个文件,一个
index索引文件,一个data数据文件
执行代码中的read方法,可以输出小文件的名称和内容
下面我们来看一个案例
我们来使用SequenceFile实现小文件的存储和计算
小文件的存储刚才我们已经通过代码实现了,接下来我们要实现如何通过MapReduce读取SequenceFile
咱们之前的代码默认只能读取普通文本文件,针对SequenceFile是无法读取的
那该如何设置才能让mapreduce可以读取SequenceFile呢?
很简单,只需要在job中设置输入数据处理类就行了,默认情况下使用的是TextInputFormat
创建一个新的类WordCountJobSeq
注意修改两个地方
- 修改job中的设置输入数据处理类
- 修改map中k1的数据类型为Text类型
job.setInputFormatClass(SequenceFileInputFormat.class)
创建一个新的类WordCountJobSeq
注意修改两个地方
- 修改job中的设置输入数据处理类
- 修改map中k1的数据类型为Text类型
执行成功以后查看结果
package com.oldlu.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* 需求:读取SequenceFile文件
* Created by xuwei
*/
public class WordCountJobSeq {
/**
* public static class MyMapper extends Mapper<Text, Text,Text,LongWritable>
* Logger logger = LoggerFactory.getLogger(MyMapper.class);
* /**
* 需要实现map函数
* 这个map函数就是可以接收<k1,v1>,产生<k2,v2>
*
* @param k1
* @param v1
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(Text k1, Text v1, Context context)
throws IOException, InterruptedException {
//输出k1,v1的值
System.out.println("<k1,v1>=<" + k1.toString() + "," + v1.toString() + ">
//logger.info("<k1,v1>=<"+k1.get()+","+v1.toString()+">");
//k1 代表的是每一行数据的行首偏移量,v1代表的是每一行内容
//对获取到的每一行数据进行切割,把单词切割出来
String[]words = v1.toString().split(" ");
//迭代切割出来的单词数据
for (String word : words) {
//把迭代出来的单词封装成<k2,v2>的形式
Text k2 = new Text(word);
LongWritable v2 = new LongWritable(1L);
//把<k2,v2>写出去
context.write(k2, v2);
}
}
}
/**
* Reduce阶段
*/
public static class MyReducer extends Reducer<Text, LongWritable, Text, LongW
Logger logger =LoggerFactory.getLogger(MyReducer.class);
/**
* 针对<k2,{v2...}>的数据进行累加求和,并且最终把数据转化为k3,v3写出去
*
* @param k2
* @param v2s
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(Text k2,Iterable<LongWritable> v2s,Context co
throws IOException,InterruptedException{
//创建一个sum变量,保存v2s的和
long sum=0L;
//对v2s中的数据进行累加求和
for(LongWritable v2:v2s){
//输出k2,v2的值
//System.out.println("<k2,v2>=<"+k2.toString()+","+v2.get()+"
//logger.info("<k2,v2>=<"+k2.toString()+","+v2.get()+">");
sum+=v2.get();
}
//组装k3,v3
Text k3=k2;
LongWritable v3=new LongWritable(sum);
//输出k3,v3的值
//System.out.println("<k3,v3>=<"+k3.toString()+","+v3.get()+">");
//logger.info("<k3,v3>=<"+k3.toString()+","+v3.get()+">");
context.write(k3,v3);
}
}
/**
* 组装Job=Map+Reduce
*/
public static void main(String[]args){
try{
if(args.length!=2){
//如果传递的参数不够,程序直接退出
System.exit(100);
}
//指定Job需要的配置参数
Configuration conf=new Configuration();
//创建一个Job
Job job=Job.getInstance(conf);
//注意了:这一行必须设置,否则在集群中执行的时候是找不到WordCountJob这个
job.setJarByClass(WordCountJobSeq.class);
//指定输入路径(可以是文件,也可以是目录)
FileInputFormat.setInputPaths(job,new Path(args[0]));
//指定输出路径(只能指定一个不存在的目录)
FileOutputFormat.setOutputPath(job,new Path(args[1]));
//指定map相关的代码
job.setMapperClass(MyMapper.class);
//指定k2的类型
job.setMapOutputKeyClass(Text.class);
//指定v2的类型
job.setMapOutputValueClass(LongWritable.class);
//设置输入数据处理类
job.setInputFormatClass(SequenceFileInputFormat.class);
//指定reduce相关的代码
job.setReducerClass(MyReducer.class);
//指定k3的类型
job.setOutputKeyClass(Text.class);
//指定v3的类型
job.setOutputValueClass(LongWritable.class);
//提交job
job.waitForCompletion(true);
}catch(Exception e){
e.printStackTrace();
}
}
}
执行成功以后查看结果
[root@bigdata01 hadoop-3.2.0]# hdfs dfs -cat /out10/*
hello 10
you 10
此时到yarn的web界面上查看map任务的个数,发现只有1个,说明这样是生效的。
查看map任务的日志,查看打印的k1,v1日志信息
Log Type: stdout
Log Length: 301
<k1,v1>=<file1.txt,hello you>
<k1,v1>=<file10.txt,hello you>
<k1,v1>=<file2.txt,hello you>
<k1,v1>=<file3.txt,hello you>
<k1,v1>=<file4.txt,hello you>
<k1,v1>=<file5.txt,hello you>
<k1,v1>=<file6.txt,hello you>
<k1,v1>=<file7.txt,hello you>
<k1,v1>=<file8.txt,hello you>
<k1,v1>=<file9.txt,hello you>
以上是关于大数据小文件问题与企业级解决方案的主要内容,如果未能解决你的问题,请参考以下文章
Oracle 数据库 - 使用UEStudio修改dmp文件版本号,解决imp命令恢复的数据库与dmp本地文件版本号不匹配导致的导入失败问题,“ORACLE error 12547”问题处理(代码片段
大数据技术之_05_Hadoop学习_04_MapReduce_Hadoop企业优化(重中之重)+HDFS小文件优化方法+MapReduce扩展案例+倒排索引案例(多job串联)+TopN案例+找博客