如何使用Hadoop的MultipleOutputs进行多文件输出
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何使用Hadoop的MultipleOutputs进行多文件输出相关的知识,希望对你有一定的参考价值。
参考技术A有时候,我们使用Hadoop处理数据时,在Reduce阶段,我们可能想对每一个输出的key进行单独输出一个目录或文件,这样方便数据分析,比如根据某个时间段对日志文件进行时间段归类等等。这时候我们就可以使用MultipleOutputs类,来搞定这件事,
下面,先来看下散仙的测试数据:
Java代码
中国;我们
美国;他们
中国;123
中国人;善良
美国;USA
美国;在北美洲
输出结果:预期输出结果是:
中国一组,美国一组,中国人一组
核心代码如下:
Java代码
package com.partition.test;
import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import com.qin.operadb.PersonRecoder;
import com.qin.operadb.ReadMapDB;
/***
* @author qindongliang
*
* 大数据技术交流群:324714439
* **/
public class TestMultiOutput
/**
* map任务
*
* **/
public static class PMapper extends Mapper<LongWritable, Text, Text, Text>
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException
String ss[]=value.toString().split(";");
context.write(new Text(ss[0]), new Text(ss[1]));
public static class PReduce extends Reducer<Text, Text, Text, Text>
/**
* 设置多个文件输出
* */
private MultipleOutputs mos;
@Override
protected void setup(Context context)
throws IOException, InterruptedException
mos=new MultipleOutputs(context);//初始化mos
@Override
protected void reduce(Text arg0, Iterable<Text> arg1, Context arg2)
throws IOException, InterruptedException
String key=arg0.toString();
for(Text t:arg1)
if(key.equals("中国"))
/**
* 一个参数
* **/
mos.write("china", arg0,t);
else if(key.equals("美国"))
mos.write("USA", arg0,t);
else if(key.equals("中国人"))
mos.write("cperson", arg0,t);
//System.out.println("Reduce: "+arg0.toString()+" "+t.toString());
@Override
protected void cleanup(
Context context)
throws IOException, InterruptedException
mos.close();//释放资源
public static void main(String[] args) throws Exception
JobConf conf=new JobConf(ReadMapDB.class);
//Configuration conf=new Configuration();
// conf.set("mapred.job.tracker","192.168.75.130:9001");
//读取person中的数据字段
// conf.setJar("tt.jar");
//注意这行代码放在最前面,进行初始化,否则会报
/**Job任务**/
Job job=new Job(conf, "testpartion");
job.setJarByClass(TestMultiOutput.class);
System.out.println("模式: "+conf.get("mapred.job.tracker"));;
// job.setCombinerClass(PCombine.class);
//job.setPartitionerClass(PPartition.class);
//job.setNumReduceTasks(5);
job.setMapperClass(PMapper.class);
/**
* 注意在初始化时需要设置输出文件的名
* 另外名称,不支持中文名,仅支持英文字符
*
* **/
MultipleOutputs.addNamedOutput(job, "china", TextOutputFormat.class, Text.class, Text.class);
MultipleOutputs.addNamedOutput(job, "USA", TextOutputFormat.class, Text.class, Text.class);
MultipleOutputs.addNamedOutput(job, "cperson", TextOutputFormat.class, Text.class, Text.class);
job.setReducerClass(PReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
String path="hdfs://192.168.75.130:9000/root/outputdb";
FileSystem fs=FileSystem.get(conf);
Path p=new Path(path);
if(fs.exists(p))
fs.delete(p, true);
System.out.println("输出路径存在,已删除!");
FileInputFormat.setInputPaths(job, "hdfs://192.168.75.130:9000/root/input");
FileOutputFormat.setOutputPath(job,p );
System.exit(job.waitForCompletion(true) ? 0 : 1);
如果是中文的路径名,则会报如下的一个异常:
Java代码
模式: local
输出路径存在,已删除!
WARN - NativeCodeLoader.<clinit>(52) | Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
WARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
WARN - JobClient.copyAndConfigureFiles(870) | No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
INFO - FileInputFormat.listStatus(237) | Total input paths to process : 1
WARN - LoadSnappy.<clinit>(46) | Snappy native library not loaded
INFO - JobClient.monitorAndPrintJob(1380) | Running job: job_local1533332464_0001
INFO - LocalJobRunner$Job.run(340) | Waiting for map tasks
INFO - LocalJobRunner$Job$MapTaskRunnable.run(204) | Starting task: attempt_local1533332464_0001_m_000000_0
INFO - Task.initialize(534) | Using ResourceCalculatorPlugin : null
INFO - MapTask.runNewMapper(729) | Processing split: hdfs://192.168.75.130:9000/root/input/group.txt:0+91
INFO - MapTask$MapOutputBuffer.<init>(949) | io.sort.mb = 100
INFO - MapTask$MapOutputBuffer.<init>(961) | data buffer = 79691776/99614720
INFO - MapTask$MapOutputBuffer.<init>(962) | record buffer = 262144/327680
INFO - MapTask$MapOutputBuffer.flush(1289) | Starting flush of map output
INFO - MapTask$MapOutputBuffer.sortAndSpill(1471) | Finished spill 0
INFO - Task.done(858) | Task:attempt_local1533332464_0001_m_000000_0 is done. And is in the process of commiting
INFO - LocalJobRunner$Job.statusUpdate(466) |
INFO - Task.sendDone(970) | Task 'attempt_local1533332464_0001_m_000000_0' done.
INFO - LocalJobRunner$Job$MapTaskRunnable.run(229) | Finishing task: attempt_local1533332464_0001_m_000000_0
INFO - LocalJobRunner$Job.run(348) | Map task executor complete.
INFO - Task.initialize(534) | Using ResourceCalculatorPlugin : null
INFO - LocalJobRunner$Job.statusUpdate(466) |
INFO - Merger$MergeQueue.merge(408) | Merging 1 sorted segments
INFO - Merger$MergeQueue.merge(491) | Down to the last merge-pass, with 1 segments left of total size: 101 bytes
INFO - LocalJobRunner$Job.statusUpdate(466) |
WARN - LocalJobRunner$Job.run(435) | job_local1533332464_0001
java.lang.IllegalArgumentException: Name cannot be have a '一' char
at org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.checkTokenName(MultipleOutputs.java:160)
at org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.checkNamedOutputName(MultipleOutputs.java:186)
at org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.write(MultipleOutputs.java:363)
at org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.write(MultipleOutputs.java:348)
at com.partition.test.TestMultiOutput$PReduce.reduce(TestMultiOutput.java:74)
at com.partition.test.TestMultiOutput$PReduce.reduce(TestMultiOutput.java:1)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:177)
at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:649)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:418)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:398)
INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 0%
INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_local1533332464_0001
INFO - Counters.log(585) | Counters: 17
INFO - Counters.log(587) | File Input Format Counters
INFO - Counters.log(589) | Bytes Read=91
INFO - Counters.log(587) | FileSystemCounters
INFO - Counters.log(589) | FILE_BYTES_READ=177
INFO - Counters.log(589) | HDFS_BYTES_READ=91
INFO - Counters.log(589) | FILE_BYTES_WRITTEN=71111
INFO - Counters.log(587) | Map-Reduce Framework
INFO - Counters.log(589) | Map output materialized bytes=105
INFO - Counters.log(589) | Map input records=6
INFO - Counters.log(589) | Reduce shuffle bytes=0
INFO - Counters.log(589) | Spilled Records=6
INFO - Counters.log(589) | Map output bytes=87
INFO - Counters.log(589) | Total committed heap usage (bytes)=227737600
INFO - Counters.log(589) | Combine input records=0
INFO - Counters.log(589) | SPLIT_RAW_BYTES=112
INFO - Counters.log(589) | Reduce input records=0
INFO - Counters.log(589) | Reduce input groups=0
INFO - Counters.log(589) | Combine output records=0
INFO - Counters.log(589) | Reduce output records=0
INFO - Counters.log(589) | Map output records=6
源码中关于名称的校验如下:
Java代码
/**
* Checks if a named output name is valid token.
*
* @param namedOutput named output Name
* @throws IllegalArgumentException if the output name is not valid.
*/
private static void checkTokenName(String namedOutput)
if (namedOutput == null || namedOutput.length() == 0)
throw new IllegalArgumentException(
"Name cannot be NULL or emtpy");
藏经阁 | hadoop入门,如何使用hadoop做简单统计
本文通过最简单暴力的方法带你使用hadoop,对给定微博数据进行统计 。
tips:本文建立在单机模式下,若在伪分布/分布式环境下,则需要将文件写入HDFS。
目录
1.如何安装运行hadoop
2.hadoop 自带示例 WordCount 词频统计及其原理
3.对指定微博数据做相关统计
1.如何安装运行hadoop
1.1 操作系统的选择
hadoop通常部署在Linux系统,如Ubuntu、CentOS等。
hadoop2.x 是兼容windows系统的,故也可以使用windows系统,但绝大多数企业使用Linux系统。故推荐使用Linux系统进行练习。在这里我们选择Ubuntu进行演示。
如果读者没有Linux系统,可以安装双系统、使用VMWare虚拟机安装系统或直接使用Windows系统。
1.2 hadoop的安装
进入hadoop官方网站:
http://hadoop.apache.org/releases.html
选择希望的版本,点击对应的“binary”即可下载(注:sources是源代码,需要编译。binary则直接解压即可配置使用)。
下载完成后,解压文件,即得到hadoop。
此时,若只希望了解数据统计,可跳至1.3,若要进行hadoop相关配置,可参考厦门大学林子雨老师的教程:
http://dblab.xmu.edu.cn/blog/install-hadoop/
1.3 hadoop开发环境搭建
安装jdk、eclipse。
其中jdk可通过sudo apt-get install openjdk-8-jdk安装,也可通过官网下载,解压,配置环境变量进行安装。eclipse可以使用Ubuntu自带的软件市场进行安装,也可下载官网压缩包,解压即得。
配置eclipse+hadoop开发环境,配置方法如下:
方法一(简单暴力)
在eclipse中新建一个java项目,新建文件夹lib,将图中目录所有子目录下的.jar后缀文件全部拷入lib文件夹下,选中这些包,右键菜单--Build Path--Add to Build Path 配置完成√√√√
方法二
通过网络找到与下载的hadoop相同版本的hadoop-plugin插件,放入eclipse安装目录下的plugin文件夹,重启eclipse,在Windows--Preference--Hadoop Map/Reduce中选择解压过的hadoop目录,完成配置。
2.hadoop 自带示例 WordCount 词频统计及其原理
在eclipse项目中新建java文件,将WordCount代码拷入。
WordCount的代码可从hadoop tutorial中获得,百度"hadoop tutorial"第一个非广告页面即有代码。在hadoop安装路径下的hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples.jar中也有该示例,可解压获得。这里我们给出代码:
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
执行原理如下:
1.从输入目录中获取数据 。
2.TokenizerMapper类中的map方法按行读取文件,并对每行单词进行统计。
3.IntSumReducer类中的reduce方法对统计的单词进行合并,相同单词的数量相加,结果输出到指定目录下。
为方便执行,我们将Main方法输入输出目录写进代码,代码修改如下:
public static void main(String[] args) throws Exception {
String inputPath="input";
String outputPath="output";
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(inputPath));
FileOutputFormat.setOutputPath(job, new Path(outputPath));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
修改后,在项目根目录下新建input文件夹,新建文本,写入几个单词,即可运行程序,查看程序生成的output文件夹下的内容。输入输出例子如下:
abc abc def
abc def
abc 3
def 2
注意:每次运行前要删掉output文件夹,否则程序运行报错。
3.对指定微博数据做相关统计
假设我们用以下格式存储了每条微博数据:
3432265730724512 1700709963 9 8 2012-04-07 11:49:41 大学里为了学生的安全,装了减速板,但是,有的车主却从人行道上绕。岂不更危险!
3432259120159781 1700709963 6 2 2014-05-23 14:01:58 回复@齐鲁青未了1979:辛苦,同行。//@齐鲁青未了1979:今天一套表加班,下午服务器罢工,数据验收等功能无法使用,全省如此。//@叶青:回复@绑架真理:有。出事了就说是借的公安局的。//@绑架真理:我就不信你们湖北没这种情况,政府官员向企业“借”车的。
3432251654458696 1700709963 8 5 2014-05-23 14:01:58 回复@绑架真理:有。出事了就说是借的公安局的。//@绑架真理:我就不信你们湖北没这种情况,政府官员向企业“借”车的。
3431420251258871 1628951200 456 39 2012-04-05 03:50:02 好美好惬意
3431418992535259 1628951200 695 47 2012-04-05 03:45:03 七招培养孩子独立思考
第一列:一篇微博的ID编号。
第二列:发微博用户的ID编号。
第三列~第n列:其他信息。
每列以制表符(Tab、" ")分割,基于以上格式的数据,如何使用hadoop编写程序,统计每人发了多少篇微博?假如不使用hadoop,我们可以将用户ID作为key,用户发送微博数量作为value,存入map容器中。遍历每条数据, 将map[userID].value加1即可。使用hadoop进行统计,思路上是相同的。这个过程类似于词频统计,我们可以直接使用WordCount进行修改。WordCount与用户微博数统计有如下区别:
WordCount | 微博数 | |
---|---|---|
每行文本的处理 | 以' '、' '分割 | 以' '分割 |
用到的数据 | 每行的每个单词 | 每行的第二个字符串 |
统计的内容 | 每个单词出现次数 | 每个用户ID出现次数 |
在hadoop程序中,使用mapreduce进行数据离线统计,将任务分块,分配给每台机器,再将结果合并,得到答案。需要编码的部分有map操作和reduce操作,其中reduce操作与WordCount相同,map操作是将文本中的每行进行统计,对于微博统计来说,即统计每行第二个字符串。
将WordCount中map操作做如下修改,即可得到答案:
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString()," ");
if(itr.hasMoreTokens())itr.nextToken();
if(itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
完整代码如下:
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class weibo {
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString()," ");
if(itr.hasMoreTokens())itr.nextToken();
if(itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
String indir, outdir;
indir = "input";
outdir = "output";
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// FileInputFormat.addInputPath(job, new Path(args[0]));
// FileOutputFormat.setOutputPath(job, new Path(args[1]));
FileInputFormat.addInputPath(job, new Path(indir));
FileOutputFormat.setOutputPath(job, new Path(outdir));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
将微博数据放在项目根目录的input目录下,运行程序,即可得到结果。
最后的最后
依然是福利时间,北京的小伙伴看过来~
想要拿证当老司机的小伙伴注意啦:远航、海驾、远大、龙泉驾校等在京驾校开学季限时400元特惠活动即将结束,团报特惠仅剩最后50个名额!!!
来不及解释,赶快上车。各学员可根据自身要求选择驾校班型,抓紧联系后台进行预约报名!
(本报名处报名:社会人士也可享受学生特惠价,学生3人团报可再减100元!)
在京学车有几大优势: 通过率极高(92%)、拿本快、学费低、服务好、免费车接车送......
你还在犹豫什么?
赶紧联系后台咨询详情吧~
猛虎细嗅蔷薇草
账号ID:mhxxqwc
后期会有更多干货推出
长按二维码关注
以上是关于如何使用Hadoop的MultipleOutputs进行多文件输出的主要内容,如果未能解决你的问题,请参考以下文章
藏经阁 | hadoop入门,如何使用hadoop做简单统计