spark2.x由浅入深深到底系列六之RDD java api详解二

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark2.x由浅入深深到底系列六之RDD java api详解二相关的知识,希望对你有一定的参考价值。

package com.twq.javaapi.java7;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;

/**
 * Created by tangweiqun on 2017/9/16.
 */
public class BaseActionApiTest {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("appName").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);

        JavaRDD<Integer> listRDD = sc.parallelize(Arrays.asList(1, 2, 4, 3, 3, 6), 2);

        //结果: [1, 2, 4, 3, 3, 6]
        System.out.println("collect = " + listRDD.collect());
        //结果:[1, 2]
        System.out.println("take(2) = " + listRDD.take(2));
        //结果:[6, 4]
        System.out.println("top(2) = " + listRDD.top(2));
        //结果:1
        System.out.println("first = " + listRDD.first());
        //结果:1
        System.out.println("min = " + listRDD.min(new AscComparator()));
        //结果:6
        System.out.println("min = " + listRDD.min(new DescComparator()));
        //结果:6
        System.out.println("max = " + listRDD.max(new AscComparator()));
        //结果:1
        System.out.println("max = " + listRDD.max(new DescComparator()));
        //结果:[1, 2]
        System.out.println("takeOrdered(2) = " + listRDD.takeOrdered(2));
        //结果:[1, 2]
        System.out.println("takeOrdered(2)  = " + listRDD.takeOrdered(2, new AscComparator()));
        //结果:[6, 4]
        System.out.println("takeOrdered(2)  = " + listRDD.takeOrdered(2, new DescComparator()));

        listRDD.foreach(new VoidFunction<Integer>() {
            @Override
            public void call(Integer element) throws Exception {
                //这个性能太差,遍历每一个元素的时候都需要调用比较耗时的getInitNumber
                //建议采用foreachPartition来代替foreach操作
                Integer initNumber = getInitNumber("foreach");
                System.out.println((element + initNumber) + "=========");
            }
        });

        listRDD.foreachPartition(new VoidFunction<Iterator<Integer>>() {
            @Override
            public void call(Iterator<Integer> integerIterator) throws Exception {
                //和foreach api的功能是一样,只不过一个是将函数应用到每一条记录,这个是将函数应用到每一个partition
                //如果有一个比较耗时的操作,只需要每一分区执行一次这个操作就行,则用这个函数
                //这个耗时的操作可以是连接数据库等操作,不需要计算每一条时候去连接数据库,一个分区只需连接一次就行
                Integer initNumber = getInitNumber("foreach");
                while (integerIterator.hasNext()) {
                    System.out.println((integerIterator.next() + initNumber) + "=========");
                }
            }
        });

        Integer reduceResult = listRDD.reduce(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer ele1, Integer ele2) throws Exception {
                return ele1 + ele2;
            }
        });
        //结果:19
        System.out.println("reduceResult = " + reduceResult);

        Integer treeReduceResult = listRDD.treeReduce(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }
        }, 3);
        //结果:19
        System.out.println("treeReduceResult = " + treeReduceResult);

        //和reduce的功能类似,只不过是在计算每一个分区的时候需要加上初始值0,最后再将每一个分区计算出来的值相加再加上这个初始值
        Integer foldResult = listRDD.fold(0, new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }
        });
        //结果:19
        System.out.println("foldResult = " + foldResult);

        //先初始化一个我们想要的返回的数据类型的初始值
        //然后在每一个分区对每一个元素应用函数一(acc, value) => (acc._1 + value, acc._2 + 1)进行聚合
        //最后将每一个分区生成的数据应用函数(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)进行聚合
        Tuple2 aggregateResult = listRDD.aggregate(new Tuple2<Integer, Integer>(0, 0),
                new Function2<Tuple2<Integer, Integer>, Integer, Tuple2<Integer, Integer>>() {
                    @Override
                    public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> acc, Integer integer) throws Exception {
                        return new Tuple2<>(acc._1 + integer, acc._2 + 1);
                    }
                }, new Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
                    @Override
                    public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> acc1, Tuple2<Integer, Integer> acc2) throws Exception {
                        return new Tuple2<>(acc1._1 + acc2._1, acc1._2 + acc2._2);
                    }
                });
        //结果:(19,6)
        System.out.println("aggregateResult = " + aggregateResult);

        Tuple2 treeAggregateResult = listRDD.treeAggregate(new Tuple2<Integer, Integer>(0, 0),
                new Function2<Tuple2<Integer, Integer>, Integer, Tuple2<Integer, Integer>>() {
                    @Override
                    public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> acc, Integer integer) throws Exception {
                        return new Tuple2<>(acc._1 + integer, acc._2 + 1);
                    }
                }, new Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
                    @Override
                    public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> acc1, Tuple2<Integer, Integer> acc2) throws Exception {
                        return new Tuple2<>(acc1._1 + acc2._1, acc1._2 + acc2._2);
                    }
                });
        //结果:(19,6)
        System.out.println("treeAggregateResult = " + treeAggregateResult);


    }

    public static Integer getInitNumber(String source) {
        System.out.println("get init number from " + source + ", may be take much time........");
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return 1;
    }

    private static class AscComparator implements Comparator<Integer>, Serializable {

        @Override
        public int compare(java.lang.Integer o1, java.lang.Integer o2) {
            return o1 - o2;
        }

    }

    private static class DescComparator implements Comparator<Integer>, Serializable {

        @Override
        public int compare(java.lang.Integer o1, java.lang.Integer o2) {
            return o2 - o1;
        }
    }

}



对于reduce, treeReduce, fold, aggragate, treeAggrate等api的详细原理,可以参考spark core RDD api原理详解

以上是关于spark2.x由浅入深深到底系列六之RDD java api详解二的主要内容,如果未能解决你的问题,请参考以下文章

spark2.x由浅入深深到底系列六之RDD java api详解二

spark2.x由浅入深深到底系列六之RDD java api详解四

spark2.x由浅入深深到底系列六之RDD java api调用scala api的原理

spark2.x由浅入深深到底系列六之RDD 支持java8 lambda表达式

spark2.x由浅入深深到底系列六之RDD java api用JdbcRDD读取关系型数据库

spark2.x由浅入深深到底系列七之RDD python api详解一