spark actions 算子

Posted dhname

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark actions 算子相关的知识,希望对你有一定的参考价值。

package action;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;

import java.util.Arrays;
import java.util.List;
import java.util.Map;

/**
 * TODO
 *
 * @ClassName: actions
 * @author: DingH
 * @since: 2019/4/2 10:53
 */
public class actions {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("actions").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);

        JavaRDD<Integer> parallelize = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));

        JavaPairRDD<String, Integer> rdd = sc.parallelizePairs(Arrays.asList(
                new Tuple2<String, Integer>("aaaa", 111),
                new Tuple2<String, Integer>("aaaa", 111),
                new Tuple2<String, Integer>("bbbb", 222),
                new Tuple2<String, Integer>("bbbb", 222),
                new Tuple2<String, Integer>("bbbb", 222),
                new Tuple2<String, Integer>("ccc", 333)
        ));

        JavaPairRDD<String, Integer> rdd1 = rdd.reduceByKey(new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }
        });

        Tuple2<String, Integer> reduce = rdd1.reduce(new Function2<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>>() {
            public Tuple2<String, Integer> call(Tuple2<String, Integer> stringIntegerTuple2, Tuple2<String, Integer> stringIntegerTuple22) throws Exception {
                Tuple2<String, Integer> stringIntegerTuple21 = new Tuple2<String, Integer>(stringIntegerTuple2._1 + stringIntegerTuple22._1, stringIntegerTuple2._2 + stringIntegerTuple22._2);

                return stringIntegerTuple21;
            }
        });

        System.out.println(reduce);

        List<Tuple2<String, Integer>> collect = rdd1.collect();
        for (Tuple2<String,Integer> tt:collect){
            System.out.println(tt);
        }

        long count = rdd1.count();

        Tuple2<String, Integer> first = rdd1.first();

        List<Tuple2<String, Integer>> take = rdd1.take(4);

        List<Tuple2<String, Integer>> tuple2s = rdd1.takeSample(false, 3);

        rdd1.saveAsTextFile("");

        Map<String, Object> stringObjectMap = rdd1.countByKey();

        rdd1.foreach(new VoidFunction<Tuple2<String, Integer>>() {
            public void call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                System.out.println(11);
            }
        });

        sc.stop();
    }
}

  

以上是关于spark actions 算子的主要内容,如果未能解决你的问题,请参考以下文章

Spark算子篇 --Spark算子之combineByKey详解

spark的做算子统计的Java代码(在Linux系统集群式运行)

大数据-spark理论算子,shuffle优化

Spark中的各种action算子操作(java版)

UserView--第二种方式(避免第一种方式Set饱和),基于Spark算子的java代码实现

spark中如何测量每个算子的运行时间