如何使用Hadoop的MultipleOutputs进行多文件输出

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何使用Hadoop的MultipleOutputs进行多文件输出相关的知识,希望对你有一定的参考价值。

参考技术A

有时候,我们使用Hadoop处理数据时,在Reduce阶段,我们可能想对每一个输出的key进行单独输出一个目录或文件,这样方便数据分析,比如根据某个时间段对日志文件进行时间段归类等等。这时候我们就可以使用MultipleOutputs类,来搞定这件事,

下面,先来看下散仙的测试数据:

Java代码  

    中国;我们  

    美国;他们  

    中国;123  

    中国人;善良  

    美国;USA  

    美国;在北美洲  


    输出结果:预期输出结果是:
    中国一组,美国一组,中国人一组
    核心代码如下:

    Java代码  

    package com.partition.test;  

    import java.io.IOException;  

    import org.apache.hadoop.fs.FileSystem;  

    import org.apache.hadoop.fs.Path;  

    import org.apache.hadoop.io.LongWritable;  

    import org.apache.hadoop.io.Text;  

    import org.apache.hadoop.mapred.JobConf;  

    import org.apache.hadoop.mapreduce.Job;  

    import org.apache.hadoop.mapreduce.Mapper;  

    import org.apache.hadoop.mapreduce.Partitioner;  

    import org.apache.hadoop.mapreduce.Reducer;  

    import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;  

    import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;  

    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  

    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  

    import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;  

    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  

    import com.qin.operadb.PersonRecoder;  

    import com.qin.operadb.ReadMapDB;  

    /*** 

    * @author qindongliang 

    *  

    * 大数据技术交流群:324714439 

    * **/  

    public class TestMultiOutput   

    /** 

    * map任务 

    *  

    * **/  

    public static class PMapper extends Mapper<LongWritable, Text, Text, Text>  

    @Override  

    protected void map(LongWritable key, Text value,Context context)  

    throws IOException, InterruptedException   

    String ss[]=value.toString().split(";");  

    context.write(new Text(ss[0]), new Text(ss[1]));      

      

      

    public static class PReduce extends Reducer<Text, Text, Text, Text>  

    /** 

    * 设置多个文件输出 

    * */  

    private MultipleOutputs mos;  

    @Override  

    protected void setup(Context context)  

    throws IOException, InterruptedException   

    mos=new MultipleOutputs(context);//初始化mos  

      

    @Override  

    protected void reduce(Text arg0, Iterable<Text> arg1, Context arg2)  

    throws IOException, InterruptedException   

    String key=arg0.toString();  

    for(Text t:arg1)  

    if(key.equals("中国"))   

    /** 

    * 一个参数 

    * **/  

    mos.write("china", arg0,t);   

     else if(key.equals("美国"))  

    mos.write("USA", arg0,t);      

     else if(key.equals("中国人"))  

    mos.write("cperson", arg0,t);   

      

    //System.out.println("Reduce:  "+arg0.toString()+"   "+t.toString());  

      

      

    @Override  

    protected void cleanup(  

    Context context)  

    throws IOException, InterruptedException   

    mos.close();//释放资源  

      

      

    public static void main(String[] args) throws Exception  

    JobConf conf=new JobConf(ReadMapDB.class);  

    //Configuration conf=new Configuration();  

    // conf.set("mapred.job.tracker","192.168.75.130:9001");  

    //读取person中的数据字段  

    // conf.setJar("tt.jar");  

    //注意这行代码放在最前面,进行初始化,否则会报  

    /**Job任务**/  

    Job job=new Job(conf, "testpartion");  

    job.setJarByClass(TestMultiOutput.class);  

    System.out.println("模式:  "+conf.get("mapred.job.tracker"));;  

    // job.setCombinerClass(PCombine.class);  

    //job.setPartitionerClass(PPartition.class);  

    //job.setNumReduceTasks(5);  

    job.setMapperClass(PMapper.class);  

    /** 

    * 注意在初始化时需要设置输出文件的名 

    * 另外名称,不支持中文名,仅支持英文字符 

    *  

    * **/  

    MultipleOutputs.addNamedOutput(job, "china", TextOutputFormat.class, Text.class, Text.class);  

    MultipleOutputs.addNamedOutput(job, "USA", TextOutputFormat.class, Text.class, Text.class);  

    MultipleOutputs.addNamedOutput(job, "cperson", TextOutputFormat.class, Text.class, Text.class);  

    job.setReducerClass(PReduce.class);  

    job.setOutputKeyClass(Text.class);  

    job.setOutputValueClass(Text.class);  

    String path="hdfs://192.168.75.130:9000/root/outputdb";  

    FileSystem fs=FileSystem.get(conf);  

    Path p=new Path(path);  

    if(fs.exists(p))  

    fs.delete(p, true);  

    System.out.println("输出路径存在,已删除!");  

      

    FileInputFormat.setInputPaths(job, "hdfs://192.168.75.130:9000/root/input");  

    FileOutputFormat.setOutputPath(job,p );  

    System.exit(job.waitForCompletion(true) ? 0 : 1);    

      

      


    如果是中文的路径名,则会报如下的一个异常:

    Java代码  

    模式:  local  

    输出路径存在,已删除!  

    WARN - NativeCodeLoader.<clinit>(52) | Unable to load native-hadoop library for your platform... using builtin-java classes where applicable  

    WARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.  

    WARN - JobClient.copyAndConfigureFiles(870) | No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).  

    INFO - FileInputFormat.listStatus(237) | Total input paths to process : 1  

    WARN - LoadSnappy.<clinit>(46) | Snappy native library not loaded  

    INFO - JobClient.monitorAndPrintJob(1380) | Running job: job_local1533332464_0001  

    INFO - LocalJobRunner$Job.run(340) | Waiting for map tasks  

    INFO - LocalJobRunner$Job$MapTaskRunnable.run(204) | Starting task: attempt_local1533332464_0001_m_000000_0  

    INFO - Task.initialize(534) |  Using ResourceCalculatorPlugin : null  

    INFO - MapTask.runNewMapper(729) | Processing split: hdfs://192.168.75.130:9000/root/input/group.txt:0+91  

    INFO - MapTask$MapOutputBuffer.<init>(949) | io.sort.mb = 100  

    INFO - MapTask$MapOutputBuffer.<init>(961) | data buffer = 79691776/99614720  

    INFO - MapTask$MapOutputBuffer.<init>(962) | record buffer = 262144/327680  

    INFO - MapTask$MapOutputBuffer.flush(1289) | Starting flush of map output  

    INFO - MapTask$MapOutputBuffer.sortAndSpill(1471) | Finished spill 0  

    INFO - Task.done(858) | Task:attempt_local1533332464_0001_m_000000_0 is done. And is in the process of commiting  

    INFO - LocalJobRunner$Job.statusUpdate(466) |   

    INFO - Task.sendDone(970) | Task 'attempt_local1533332464_0001_m_000000_0' done.  

    INFO - LocalJobRunner$Job$MapTaskRunnable.run(229) | Finishing task: attempt_local1533332464_0001_m_000000_0  

    INFO - LocalJobRunner$Job.run(348) | Map task executor complete.  

    INFO - Task.initialize(534) |  Using ResourceCalculatorPlugin : null  

    INFO - LocalJobRunner$Job.statusUpdate(466) |   

    INFO - Merger$MergeQueue.merge(408) | Merging 1 sorted segments  

    INFO - Merger$MergeQueue.merge(491) | Down to the last merge-pass, with 1 segments left of total size: 101 bytes  

    INFO - LocalJobRunner$Job.statusUpdate(466) |   

    WARN - LocalJobRunner$Job.run(435) | job_local1533332464_0001  

    java.lang.IllegalArgumentException: Name cannot be have a '一' char  

    at org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.checkTokenName(MultipleOutputs.java:160)  

    at org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.checkNamedOutputName(MultipleOutputs.java:186)  

    at org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.write(MultipleOutputs.java:363)  

    at org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.write(MultipleOutputs.java:348)  

    at com.partition.test.TestMultiOutput$PReduce.reduce(TestMultiOutput.java:74)  

    at com.partition.test.TestMultiOutput$PReduce.reduce(TestMultiOutput.java:1)  

    at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:177)  

    at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:649)  

    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:418)  

    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:398)  

    INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 0%  

    INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_local1533332464_0001  

    INFO - Counters.log(585) | Counters: 17  

    INFO - Counters.log(587) |   File Input Format Counters   

    INFO - Counters.log(589) |     Bytes Read=91  

    INFO - Counters.log(587) |   FileSystemCounters  

    INFO - Counters.log(589) |     FILE_BYTES_READ=177  

    INFO - Counters.log(589) |     HDFS_BYTES_READ=91  

    INFO - Counters.log(589) |     FILE_BYTES_WRITTEN=71111  

    INFO - Counters.log(587) |   Map-Reduce Framework  

    INFO - Counters.log(589) |     Map output materialized bytes=105  

    INFO - Counters.log(589) |     Map input records=6  

    INFO - Counters.log(589) |     Reduce shuffle bytes=0  

    INFO - Counters.log(589) |     Spilled Records=6  

    INFO - Counters.log(589) |     Map output bytes=87  

    INFO - Counters.log(589) |     Total committed heap usage (bytes)=227737600  

    INFO - Counters.log(589) |     Combine input records=0  

    INFO - Counters.log(589) |     SPLIT_RAW_BYTES=112  

    INFO - Counters.log(589) |     Reduce input records=0  

    INFO - Counters.log(589) |     Reduce input groups=0  

    INFO - Counters.log(589) |     Combine output records=0  

    INFO - Counters.log(589) |     Reduce output records=0  

    INFO - Counters.log(589) |     Map output records=6  



    源码中关于名称的校验如下:

    Java代码  

    /** 

    * Checks if a named output name is valid token. 

    * @param namedOutput named output Name 

    * @throws IllegalArgumentException if the output name is not valid. 

    */  

    private static void checkTokenName(String namedOutput)   

    if (namedOutput == null || namedOutput.length() == 0)   

    throw new IllegalArgumentException(  

    "Name cannot be NULL or emtpy");  

      

藏经阁 | hadoop入门,如何使用hadoop做简单统计

本文通过最简单暴力的方法带你使用hadoop,对给定微博数据进行统计 。

tips:本文建立在单机模式下,若在伪分布/分布式环境下,则需要将文件写入HDFS。

目录

1.如何安装运行hadoop

2.hadoop 自带示例 WordCount 词频统计及其原理

3.对指定微博数据做相关统计


1.如何安装运行hadoop

1.1 操作系统的选择

hadoop通常部署在Linux系统,如Ubuntu、CentOS等。

hadoop2.x 是兼容windows系统的,故也可以使用windows系统,但绝大多数企业使用Linux系统。故推荐使用Linux系统进行练习。在这里我们选择Ubuntu进行演示。

如果读者没有Linux系统,可以安装双系统、使用VMWare虚拟机安装系统或直接使用Windows系统。

1.2 hadoop的安装

进入hadoop官方网站:

http://hadoop.apache.org/releases.html

选择希望的版本,点击对应的“binary”即可下载(注:sources是源代码,需要编译。binary则直接解压即可配置使用)。

下载完成后,解压文件,即得到hadoop。

藏经阁 | hadoop入门,如何使用hadoop做简单统计

此时,若只希望了解数据统计,可跳至1.3,若要进行hadoop相关配置,可参考厦门大学林子雨老师的教程:

http://dblab.xmu.edu.cn/blog/install-hadoop/

1.3 hadoop开发环境搭建

安装jdk、eclipse。 

其中jdk可通过sudo apt-get install openjdk-8-jdk安装,也可通过官网下载,解压,配置环境变量进行安装。eclipse可以使用Ubuntu自带的软件市场进行安装,也可下载官网压缩包,解压即得。

配置eclipse+hadoop开发环境,配置方法如下:

方法一(简单暴力)

在eclipse中新建一个java项目,新建文件夹lib,将图中目录所有子目录下的.jar后缀文件全部拷入lib文件夹下,选中这些包,右键菜单--Build Path--Add to Build Path 配置完成√√√√

藏经阁 | hadoop入门,如何使用hadoop做简单统计

藏经阁 | hadoop入门,如何使用hadoop做简单统计

方法二

通过网络找到与下载的hadoop相同版本的hadoop-plugin插件,放入eclipse安装目录下的plugin文件夹,重启eclipse,在Windows--Preference--Hadoop Map/Reduce中选择解压过的hadoop目录,完成配置。


2.hadoop 自带示例 WordCount 词频统计及其原理

在eclipse项目中新建java文件,将WordCount代码拷入。

WordCount的代码可从hadoop tutorial中获得,百度"hadoop tutorial"第一个非广告页面即有代码。在hadoop安装路径下的hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples.jar中也有该示例,可解压获得。这里我们给出代码:

 
   
   
 
  1. import java.io.IOException;

  2. import java.util.StringTokenizer;

  3. import org.apache.hadoop.conf.Configuration;

  4. import org.apache.hadoop.fs.Path;

  5. import org.apache.hadoop.io.IntWritable;

  6. import org.apache.hadoop.io.Text;

  7. import org.apache.hadoop.mapreduce.Job;

  8. import org.apache.hadoop.mapreduce.Mapper;

  9. import org.apache.hadoop.mapreduce.Reducer;

  10. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

  11. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

  12. public class WordCount {

  13.  public static class TokenizerMapper

  14.       extends Mapper<Object, Text, Text, IntWritable>{

  15.    private final static IntWritable one = new IntWritable(1);

  16.    private Text word = new Text();

  17.    public void map(Object key, Text value, Context context

  18.                    ) throws IOException, InterruptedException {

  19.      StringTokenizer itr = new StringTokenizer(value.toString());

  20.      while (itr.hasMoreTokens()) {

  21.        word.set(itr.nextToken());

  22.        context.write(word, one);

  23.      }

  24.    }

  25.  }

  26.  public static class IntSumReducer

  27.       extends Reducer<Text,IntWritable,Text,IntWritable> {

  28.    private IntWritable result = new IntWritable();

  29.    public void reduce(Text key, Iterable<IntWritable> values,

  30.                       Context context

  31.                       ) throws IOException, InterruptedException {

  32.      int sum = 0;

  33.      for (IntWritable val : values) {

  34.        sum += val.get();

  35.      }

  36.      result.set(sum);

  37.      context.write(key, result);

  38.    }

  39.  }

  40.  public static void main(String[] args) throws Exception {

  41.    Configuration conf = new Configuration();

  42.    Job job = Job.getInstance(conf, "word count");

  43.    job.setJarByClass(WordCount.class);

  44.    job.setMapperClass(TokenizerMapper.class);

  45.    job.setCombinerClass(IntSumReducer.class);

  46.    job.setReducerClass(IntSumReducer.class);

  47.    job.setOutputKeyClass(Text.class);

  48.    job.setOutputValueClass(IntWritable.class);

  49.    FileInputFormat.addInputPath(job, new Path(args[0]));

  50.    FileOutputFormat.setOutputPath(job, new Path(args[1]));

  51.    System.exit(job.waitForCompletion(true) ? 0 : 1);

  52.  }

  53. }

执行原理如下: 

1.从输入目录中获取数据 。

2.TokenizerMapper类中的map方法按行读取文件,并对每行单词进行统计。

3.IntSumReducer类中的reduce方法对统计的单词进行合并,相同单词的数量相加,结果输出到指定目录下。

为方便执行,我们将Main方法输入输出目录写进代码,代码修改如下:

 
   
   
 
  1.  public static void main(String[] args) throws Exception {

  2.    String inputPath="input";

  3.    String outputPath="output";

  4.    Configuration conf = new Configuration();

  5.    Job job = Job.getInstance(conf, "word count");

  6.    job.setJarByClass(WordCount.class);

  7.    job.setMapperClass(TokenizerMapper.class);

  8.    job.setCombinerClass(IntSumReducer.class);

  9.    job.setReducerClass(IntSumReducer.class);

  10.    job.setOutputKeyClass(Text.class);

  11.    job.setOutputValueClass(IntWritable.class);

  12.    FileInputFormat.addInputPath(job, new Path(inputPath));

  13.    FileOutputFormat.setOutputPath(job, new Path(outputPath));

  14.    System.exit(job.waitForCompletion(true) ? 0 : 1);

  15.  }

  16. }

修改后,在项目根目录下新建input文件夹,新建文本,写入几个单词,即可运行程序,查看程序生成的output文件夹下的内容。输入输出例子如下:

 
   
   
 
  1. abc abc def

  2. abc def


 
   
   
 
  1. abc 3

  2. def 2

注意:每次运行前要删掉output文件夹,否则程序运行报错。


3.对指定微博数据做相关统计

假设我们用以下格式存储了每条微博数据:

 
   
   
 
  1. 3432265730724512    1700709963  9   8   2012-04-07 11:49:41 大学里为了学生的安全,装了减速板,但是,有的车主却从人行道上绕。岂不更危险!

  2. 3432259120159781    1700709963  6   2   2014-05-23 14:01:58 回复@齐鲁青未了1979:辛苦,同行。//@齐鲁青未了1979:今天一套表加班,下午服务器罢工,数据验收等功能无法使用,全省如此。//@叶青:回复@绑架真理:有。出事了就说是借的公安局的。//@绑架真理:我就不信你们湖北没这种情况,政府官员向企业“借”车的。

  3. 3432251654458696    1700709963  8   5   2014-05-23 14:01:58 回复@绑架真理:有。出事了就说是借的公安局的。//@绑架真理:我就不信你们湖北没这种情况,政府官员向企业“借”车的。

  4. 3431420251258871    1628951200  456 39  2012-04-05 03:50:02 好美好惬意

  5. 3431418992535259    1628951200  695 47  2012-04-05 03:45:03 七招培养孩子独立思考

第一列:一篇微博的ID编号。 

第二列:发微博用户的ID编号。 

第三列~第n列:其他信息。

每列以制表符(Tab、" ")分割,基于以上格式的数据,如何使用hadoop编写程序,统计每人发了多少篇微博?假如不使用hadoop,我们可以将用户ID作为key,用户发送微博数量作为value,存入map容器中。遍历每条数据, 将map[userID].value加1即可。使用hadoop进行统计,思路上是相同的。这个过程类似于词频统计,我们可以直接使用WordCount进行修改。WordCount与用户微博数统计有如下区别:


WordCount 微博数
每行文本的处理 以' '、' '分割 以' '分割
用到的数据 每行的每个单词 每行的第二个字符串
统计的内容 每个单词出现次数 每个用户ID出现次数

在hadoop程序中,使用mapreduce进行数据离线统计,将任务分块,分配给每台机器,再将结果合并,得到答案。需要编码的部分有map操作和reduce操作,其中reduce操作与WordCount相同,map操作是将文本中的每行进行统计,对于微博统计来说,即统计每行第二个字符串。

将WordCount中map操作做如下修改,即可得到答案:

 
   
   
 
  1.        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

  2.            StringTokenizer itr = new StringTokenizer(value.toString()," ");

  3.            if(itr.hasMoreTokens())itr.nextToken();

  4.            if(itr.hasMoreTokens()) {

  5.                word.set(itr.nextToken());

  6.                context.write(word, one);

  7.            }

  8.        }

完整代码如下:

 
   
   
 
  1. import java.io.IOException;

  2. import java.util.StringTokenizer;

  3. import org.apache.hadoop.conf.Configuration;

  4. import org.apache.hadoop.fs.Path;

  5. import org.apache.hadoop.io.IntWritable;

  6. import org.apache.hadoop.io.Text;

  7. import org.apache.hadoop.mapreduce.Job;

  8. import org.apache.hadoop.mapreduce.Mapper;

  9. import org.apache.hadoop.mapreduce.Reducer;

  10. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

  11. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

  12. public class weibo {

  13.    public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {

  14.        private final static IntWritable one = new IntWritable(1);

  15.        private Text word = new Text();

  16.        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

  17.            StringTokenizer itr = new StringTokenizer(value.toString()," ");

  18.            if(itr.hasMoreTokens())itr.nextToken();

  19.            if(itr.hasMoreTokens()) {

  20.                word.set(itr.nextToken());

  21.                context.write(word, one);

  22.            }

  23.        }

  24.    }

  25.    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

  26.        private IntWritable result = new IntWritable();

  27.        public void reduce(Text key, Iterable<IntWritable> values, Context context)

  28.                throws IOException, InterruptedException {

  29.            int sum = 0;

  30.            for (IntWritable val : values) {

  31.                sum += val.get();

  32.            }

  33.            result.set(sum);

  34.            context.write(key, result);

  35.        }

  36.    }

  37.    public static void main(String[] args) throws Exception {

  38.        String indir, outdir;

  39.        indir = "input";

  40.        outdir = "output";

  41.        Configuration conf = new Configuration();

  42.        Job job = Job.getInstance(conf, "word count");

  43.        job.setJarByClass(WordCount.class);

  44.        job.setMapperClass(TokenizerMapper.class);

  45.        job.setCombinerClass(IntSumReducer.class);

  46.        job.setReducerClass(IntSumReducer.class);

  47.        job.setOutputKeyClass(Text.class);

  48.        job.setOutputValueClass(IntWritable.class);

  49.        // FileInputFormat.addInputPath(job, new Path(args[0]));

  50.        // FileOutputFormat.setOutputPath(job, new Path(args[1]));

  51.        FileInputFormat.addInputPath(job, new Path(indir));

  52.        FileOutputFormat.setOutputPath(job, new Path(outdir));

  53.        System.exit(job.waitForCompletion(true) ? 0 : 1);

  54.    }

  55. }

将微博数据放在项目根目录的input目录下,运行程序,即可得到结果。


藏经阁 | hadoop入门,如何使用hadoop做简单统计

最后的最后

依然是福利时间,北京的小伙伴看过来~

想要拿证当老司机的小伙伴注意啦:远航、海驾、远大、龙泉驾校等在京驾校开学季限时400元特惠活动即将结束,团报特惠仅剩最后50个名额!!!

来不及解释,赶快上车。各学员可根据自身要求选择驾校班型,抓紧联系后台进行预约报名!

本报名处报名:社会人士也可享受学生特惠价,学生3人团报可再减100元!

在京学车有几大优势: 通过率极高(92%)、拿本快、学费低、服务好、免费车接车送......

你还在犹豫什么?

赶紧联系后台咨询详情吧~

藏经阁 | hadoop入门,如何使用hadoop做简单统计

猛虎细嗅蔷薇草

账号ID:mhxxqwc


后期会有更多干货推出  

长按二维码关注

以上是关于如何使用Hadoop的MultipleOutputs进行多文件输出的主要内容,如果未能解决你的问题,请参考以下文章

藏经阁 | hadoop入门,如何使用hadoop做简单统计

如何使用 Hadoop FS shell 将 hadoop 中的两个文件连接成一个文件?

如何检查集群中使用的 hadoop 分布?

如何使用Hadoop捆绑的低级工具进行数据提取?

如何使用idea开发hadoop程序

请教如何在hadoop获取数据