如何自定义一个hadoop mapreducer中reducer输出的时候以csv文件输出。

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何自定义一个hadoop mapreducer中reducer输出的时候以csv文件输出。相关的知识,希望对你有一定的参考价值。

如何编写 个新的outputFormat类,它类似于TextOutputFormat,只是输出的时候以csv文件格式输出

参考技术A outputformat类应该是用来定义输出文件的格式吧,输出都在HDFS上,不分文件格式吧。你可以导出的时候自己转追问

可以详述清楚点么?

追答

清楚的讲法:你的想法实现不了

追问

那有没有这种呢,它不是有个TextoutputFormat类吗,这个直接输出key和value写入文件,如果是我再将这些写入的文件读取出来用csv输出,这种怎么实现呢?

追答

这就简单多了,hdfs有一个下载的命令,一般都是txt形式的,txt你自己转成csv也可以,或者下载的时候指定文件名为csv,但是推荐你前者,因为它本身应该是没有csv编码功能的。
这些都是个人意见

追问

可以给个实例我看看么?这样说我太抽象了。。

追答

我都不知道你的命令在哪里执行,我只能帮你到这里了,再多我想你还是去百度看看吧

参考技术B 设定输出分隔符为“,”,并且multipleoutputtformat自定义输出文件名为*.csv就可以了 参考技术C csv是以逗号分割的纯文本数据文件,你只要掌握格式,就ok
http://baike.baidu.com/view/468993.htm追问

csv的格式我知道,我就是不知道怎么reducer输出为csv,不知道怎么指定一个新的输出格式

hadoop离线day05--Hadoop MapReduce

hadoop离线day05--Hadoop MapReduce


今日内容大纲

#1、MR序列化机制


    什么叫做序列化 使用场景
    Java中序列化机制
    Hadoop序列化机制 Writable
    自定义对象类型能否在MR中使用传递。



#2、自定义排序


    默认字典序  a-z 正序 升序
    如果需要倒序 如何实现?
    Comparable接口 CompareTo方法

 

#3、自定义分区


    默认分区规则 HashPartitioner
    探究分区个数和reducetask个数之间的关系



#4、MR中优化组件 Combiner (了解)


    map 局部计算
    combiner局部聚合计算
    reduce全局聚合计算


#5、MR的并行度机制


    maptask reducetask是如何决定的

 

6、再次深入MapReduce 梳理流程  细节补充完整


    mapper阶段执行流程
    reducer阶段执行流程
    shuffle 洗牌


  • MapReduce编程技巧

    • 核心:牢牢把握住key 确定key是什么。

    • 因为MR很多默认属性都和key相关。

      排序  key字典序
      分区  keyhash % numReducetask
      分组  key相同
  • Hadoop序列化机制

    • 什么是序列化

      结构化对象和字节流之间转换问题。
      ​
      对象--->字节流  序列化
      字节流-->对象   反序列化
    • 什么场合需要进行序列化

      跨进程、跨网络传递数据的时候  对象数据持久化的时候
    • java中序列化

      #java.io.Serializable  接口
      在java中 如果对象需要序列化 只有实现java.io.Serializable接口即可。
      ​
      标记用法。
      ​
      java的序列化机制比较庞大臃肿,在序列化的时候不够干脆 额外携带很多附属信息。
      hadoop就认为在大数据处理中效率不高。
    • hadoop序列化

      //hadoop自己实现了一个序列化机制 Writable接口
      ​
      public interface Writable 
          
          void write(DataOutput var1) throws IOException; //序列化方法
      ​
          void readFields(DataInput var1) throws IOException;//反序列化方法
      
      ​
      //用户可以自己指定序列化什么属性 哪些属性 什么顺序序列化
      ​
      ​
      //并且基于Writable接口 Hadoop自己封装了一套数据类型  建议在MapReduce中使用
      ​
      //自己定义的对象可以在MapReduce中使用,前提是实现Writable接口
  • MapReduce的自定义排序

    • mr中key有默认的排序行为 :key的字典序、正序

    • 思路

      • 如果你的需求是正序字典序排序且你的数据类型是基本类型 此时直接把需要排序的数据作为key即可参与排序。

      • 如果你的需求是倒序或者你的数据类型是对象引用类型 直接作为key不符合你的需求 重写排序规则。

    • java中对象是如何实现排序的

      对象需要实现接口Comparable 重写CompareTo方法
      ​
      a.compareTo(b)
      ​
      返回0    表示相等
      返回负数  表示a < b
      返回正数  表示a > b
      ​
      #底层比较什么 http://ascii.911cha.com/
      ​
      ​
      #如何实现倒序  改变结果正负即可
      ​
      a 97 
      b 98
      a.compareTo(b) -1       a < b   ab
                     -(-1)    a > b   ba
    • 对应自定义对象类型 需要实现排序规则

      • 对象中通常包含多个属性 需要在CompareTo方法中指定 根据哪些属性何种方式进行排序。

    • 关于迭代器的遍历

      • 增强for循环 iter

      • 迭代器while循环遍历

        Iterator<Text> iterator = values.iterator();
                while (iterator.hasNext())
                    iterator.next();
                
  • MapReduce的输出可以直接作为下一个MapReduce的输入来处理

    • MapReduce可以自己识别里面那些是校验文件 那些是标记文件 那些是真正的数据文件。

  • MapReduce自定义分区

    • 思路

      输出到不同文件中--->reducetask个数>1---->默认只有1个,如何变成多个--->job.setNumReduceTasks(N)--->意味着要分区partition---->默认分区规则HashPartitioner--->默认分区规则满足你业务需求吗?---->满足,直接用----->如果不满足--->重写分区规则--->如何重写呢?--->接下来仔细看!!
    • 默认分区规则 HashPartitioner

      public class HashPartitioner<K, V> extends Partitioner<K, V> 
      ​
        /** Use @link Object#hashCode() to partition. */
        public int getPartition(K key, V value,
                                int numReduceTasks) 
          return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
        
      ​
      
      ​
      //核心逻辑    key.hashCode()  % numReduceTasks
      // & Integer.MAX_VALUE 有些数据hash值是负数 通过与计算变成正数
    • 扩展:实现男的在一个分区,女的在一个分区

      public class AllenPartitioner<K, V> extends Partitioner<K, V> 
      ​
        
        public int getPartition(K key, V value,
                                int numReduceTasks) 
          if("男".equals(k))
              return 0;
          else
              return 1;
          
        
      
    • 如何让自己的分区类在MapReduce中生效

       job.setPartitionerClass(AllenPartitioner.class);
  • MapReduce Combiner组件

    • 中文叫做规约

      数据归约是指在尽可能保持数据原貌的前提下,最大限度地精简数据量。
      ​
              Combine input records=54
              Combine output records=21
    • Combiner是一个优化的组件,默认情况下不存在。慎重使用。

    • 以下场景慎重使用

      • 结果和数据个数有关的

      • 结果和数据的顺序有关的

    • Combiner本质就是reducer 只不过聚合范围不是全局聚合 而是局部聚合。

    • 栗子

      #如果评估局部聚合的逻辑和最终reduce聚合的逻辑一样 可以省去写Combiner类 直接使用Reducer代替Combiner
      ​
      #需要在代码中进行设置
      job.setCombinerClass(xxxx)

MapReduce并行度机制

  • 并行度是什么

    • 指的是maptask reducetask个数决定机制。 多个task一起运行不就是所谓的并行嘛。

  • Maptask并行度机制

    • 机制: 逻辑切片 原理见画图

    • Q:影响maptask个数的有哪些因素。

      文件个数
      文件大小
      split size
    • 因为maptask并行度机制可以根据数据量自适应调整maptask个数,因为在正常情况下,不需要用户干预maptask并行度的。

  • reducetask并行度机制

    • 机制:手动决定机制。

    • 默认情况下,不管数据量多大,全局只有一个reducetask.

    • 用户可以通过代码去设置reducetask个数。

      job.setNumReduceTasks(N)
    • 一旦设置多个reducetask 需要考虑数据会分区。一旦分区导致数据输出结果在不同文件中。

    • 对于需要全局统计 全局计数的需求 只能使用一个reducetask。

    • 当有多个reducetask,如果数据本身不规则或者分区策略选择不好,会导致reduce阶段数据倾斜。


MapReduce分区、分组区别

  • 分区

    • 发生map阶段输出数据到缓冲区之前。

    • 默认规则是Hashpartitioner.

    • 导致key相同的数据会被分到同一个分区 交给同一个reducetask来处理。

  • 分组

    • 发送reduce阶段数据调用reduce方法之前。

    • 默认规则是前后key一样即为一组

    • 导致相同的这一组数据去一起调用reduce方法。


 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

以上是关于如何自定义一个hadoop mapreducer中reducer输出的时候以csv文件输出。的主要内容,如果未能解决你的问题,请参考以下文章

hadoop离线day05--Hadoop MapReduce

hadoop离线day05--Hadoop MapReduce

大数据之Hadoop(MapReduce):自定义InputFormat

Hadoop学习之路MapReduce自定义排序

Hadoop中的MapReduce框架原理自定义Partitioner步骤在Job驱动中,设置自定义PartitionerPartition 分区案例

Hadoop_28_MapReduce_自定义 inputFormat