spark记录spark算子之Transformation

Posted kpsmile

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark记录spark算子之Transformation相关的知识,希望对你有一定的参考价值。

一、map、flatMap、mapParations、mapPartitionsWithIndex

1.1 map

map十分容易理解,他是将源JavaRDD的一个一个元素的传入call方法,并经过算法后一个一个的返回从而生成一个新的JavaRDD。

(1) 使用Java进行编写

public static void map() {
        List<String> list = Arrays.asList("李光洙","刘在石","哈哈","宋智孝");
        JavaRDD<String> rdd = jsc.parallelize(list);
        
        JavaRDD<String> map = rdd.map(new Function<String, String>() {

            private static final long serialVersionUID = 1L;

            @Override
            public String call(String name) throws Exception {
                return "hello,"+name;
            }
        });
        map.foreach(new VoidFunction<String>() {
            
            /**
             * 
             */
            private static final long serialVersionUID = 1L;

            @Override
            public void call(String msg) throws Exception {
                System.out.println(msg);
            }
        });
        
    }

(2) 使用scala进行编写

  def map(): Unit = {
    val list = List("李光洙","刘在石","哈哈","宋智孝");
    val rdd = sc.parallelize(list)
    val map = rdd.map(s => "hello," + s).foreach(println) 
  }

(3)运行结果

技术图片

(4) 总结

可以看出,对于map算子,源JavaRDD的每个元素都会进行计算,由于是依次进行传参,所以他是有序的,新RDD的元素顺序与源RDD是相同的。而由有序又引出接下来的flatMap。

1.2 flatMap

flatMap与map一样,是将RDD中的元素依次的传入call方法,他比map多的功能是能在任何一个传入call方法的元素后面添加任意多元素,而能达到这一点,正是因为其进行传参是依次进行的。

(1) 使用Java进行编写

    public static void flatmap() {
        List<String> list = Arrays.asList("李光洙 刘在石","哈哈 宋智孝");
        JavaRDD<String> rdd = jsc.parallelize(list);
        
        JavaRDD<String> map = rdd.flatMap(new FlatMapFunction<String, String>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Iterable<String> call(String line) throws Exception {
                return Arrays.asList(line.split(" "));
            }
        }).map(new Function<String, String>() {

            private static final long serialVersionUID = 1L;

            @Override
            public String call(String s) throws Exception {
                return "你好," + s;
            }
        });
        
        map.foreach(new VoidFunction<String>() {
            
            @Override
            public void call(String s) throws Exception {
                System.out.println(s);
            }
        });
        
    }

(2) 使用scala进行编写

  def flatmap(): Unit = {
    val list = List("李光洙 刘在石","哈哈 宋智孝");
    val rdd = sc.parallelize(list)
    rdd.flatMap(_.split(" ")).map("你好,"+_).foreach(println)
  }

(3) 运行结果

技术图片

(4) 总结

flatMap的特性决定了这个算子在对需要随时增加元素的时候十分好用,比如在对源RDD查漏补缺时。

map和flatMap都是依次进行参数传递的,但有时候需要RDD中的两个元素进行相应操作时(例如:算存款所得时,下一个月所得的利息是要原本金加上上一个月所得的本金的),这两个算子便无法达到目的了,这是便需要mapPartitions算子,他传参的方式是将整个RDD传入,然后将一个迭代器传出生成一个新的RDD,由于整个RDD都传入了,所以便能完成前面说的业务。

map是对RDD中元素逐一进行函数操作映射为另外一个RDD,而flatMap操作是将函数应用于RDD之中的每一个元素,将返回的迭代器的所有内容构成新的RDD。而flatMap操作是将函数应用于RDD中每一个元素,将返回的迭代器的所有内容构成RDD。

flatMap与map区别在于map为“映射”,而flatMap“先映射,后扁平化”,map对每一次(func)都产生一个元素,返回一个对象,而flatMap多一步就是将所有对象合并为一个对象。

1.3 mapPartitions

与map方法类似,map是对rdd中的每一个元素进行操作,而mapPartitions(foreachPartition)则是对rdd中的每个分区的迭代器进行操作。如果在map过程中需要频繁创建额外的对象(例如将rdd中的数据通过jdbc写入数据库,map需要为每个元素创建一个链接而mapPartition为每个partition创建一个链接),则mapPartitions效率比map高的多。

(1) 使用Java进行编写

    public static void mapPartitions() {
        JavaRDD<String> textFile = jsc.textFile("words",3);
        textFile.mapPartitions(new FlatMapFunction<Iterator<String>, String>() {
            
            private static final long serialVersionUID = 1L;

            @Override
            public Iterable<String> call(Iterator<String> is) throws Exception {
                System.out.println("创建数据库连接。。。。");
                List<String> list = new ArrayList<String>();
                while(is.hasNext()) {
                    list.add(is.next());
                    System.out.println("模拟向数据库插入批量数据。。。");
                }
                System.out.println("关闭数据库连接。。。");
                return list;
            }
        }).collect();
        
    }

(2) 使用scala进行编写

  def mapPartitions: Unit = {
    val rdd1 = sc.textFile("words")
    val mapResult = rdd1.mapPartitions(iter =>{
        println("打开数据库。。。")
        val list = List()
        while(iter.hasNext){
          list.addString(new StringBuilder(iter.next()))
          println("插入数据库。。。")
        }
        println("关闭数据库。。。")
        list.iterator
      }, false)
    mapResult.foreach(println)
  }

(3) 运行结果

技术图片

(4)总结

mapPartitions比较适合需要分批处理数据的情况,比如将数据插入某个表,每批数据只需要开启一次数据库连接,大大减少了连接开支。

1.4 mapPartitionsWithIndex

每次获取和处理的就是一个分区的数据,并且知道处理的分区的分区号是什么

(1)使用Java编写

    public static void mapPartitionsWithIndex() {
        List<String> list = Arrays.asList("李光洙","刘在石","哈哈","宋智孝");
        JavaRDD<String> rdd = jsc.parallelize(list,3);
        
        JavaRDD<String> rdd2 = rdd.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Iterator<String> call(Integer index, Iterator<String> iter) throws Exception {
                List<String> list =  new ArrayList<String>();
                while(iter.hasNext()) {
                    list.add(index+"_"+iter.next());
                }
                return list.iterator();
            }
        }, true);
        
        rdd2.foreach(new VoidFunction<String>() {
            
            @Override
            public void call(String s) throws Exception {
                System.out.println(s);
            }
        });
        
    }

(2)使用scala编写

def mapPartitionsWithIndex: Unit = {
    val list = List("李光洙","刘在石","哈哈","宋智孝")
    val rdd1 = sc.parallelize(list, 3) 
    val rdd2 = rdd1.mapPartitionsWithIndex((index,iter)=>{
      val l = ListBuffer[String]()
      while(iter.hasNext){
        val v = iter.next()
        l.append(index+"_"+v)
      }
      l.iterator
    }, true).foreach(println)
  }

(3)结果

技术图片

 

以上是关于spark记录spark算子之Transformation的主要内容,如果未能解决你的问题,请参考以下文章

Spark算子执行流程详解之六

Spark之RDD算子-转换算子

Spark算子篇 --Spark算子之aggregateByKey详解

Spark算子篇 --Spark算子之combineByKey详解

大数据之Spark:Spark调优之RDD算子调优

Spark之RDD算子