sparkRDD 算子的创建和使用
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了sparkRDD 算子的创建和使用相关的知识,希望对你有一定的参考价值。
spark是大数据领域近几年比较火的编程开发语言。有众多的好处,比如速度快,基于内存式计算框架。
不多说直接讲 spark的RDD 算子的使用。
如果有spark环境搭建等问题,请自行查找资料。本文不做讲述。
spark rdd的创建有两种方式:
1>从集合创建。也就是从父rdd继承过来
2>从外部创建。
import java.util.Arrays; import java.util.Iterator; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import com.google.common.base.Optional; import scala.Tuple2; public class Demo01 { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("Demo01").setMaster("local"); JavaSparkContext jsc = new JavaSparkContext(conf); //map(jsc); //filter(jsc); // flatMap(jsc); //groupByKey(jsc); //reduceByKey(jsc); //sortByKey(jsc); //join(jsc); leftOutJoin(jsc); jsc.stop(); } //每一条元素 都乘以2,并且打印 private static void map(JavaSparkContext jsc) { //数据源 List<Integer> lst = Arrays.asList(1,2,3,4,5,6,7,8); JavaRDD<Integer> numRDD = jsc.parallelize(lst); JavaRDD<Integer> resultRDD = numRDD.map(new Function<Integer, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer num) throws Exception { return num * 2; } }); resultRDD.foreach(new VoidFunction<Integer>() { private static final long serialVersionUID = 1L; @Override public void call(Integer num) throws Exception { System.out.println(num); } }); } // 把集合中的偶数过滤出来 private static void filter(JavaSparkContext jsc) { //数据源 List<Integer> lst = Arrays.asList(1,2,3,4,5,6,7,8); JavaRDD<Integer> numRDD = jsc.parallelize(lst); System.out.println(numRDD.filter(new Function<Integer, Boolean>() { private static final long serialVersionUID = 1L; @Override public Boolean call(Integer num) throws Exception { return num % 2 ==0; } }).collect()); } //将一行行数据的单词拆分为一个个单词 private static void flatMap(JavaSparkContext jsc) { List<String> lst = Arrays.asList("hi tim ","hello girl","hello spark"); JavaRDD<String> lines = jsc.parallelize(lst); JavaRDD<String> resultRDD = lines.flatMap(new FlatMapFunction<String, String>() { private static final long serialVersionUID = 1L; @Override public Iterable<String> call(String line) throws Exception { return Arrays.asList(line.split(" ")); } }); System.out.println(resultRDD.collect()); } // 根据班级进行分组 private static void groupByKey(JavaSparkContext jsc) { // int ,Integer // scala 里面的类型,没有像Java这样分为基本类型和包装类,因为scala是一种更加强的面向对象语言, //一切皆对象,里面的类型,也有对应的方法可以调用,隐式转换 // 模拟数据 @SuppressWarnings("unchecked") List<Tuple2<String, Integer>> lst = Arrays.asList( new Tuple2<String, Integer>("class01", 100), new Tuple2<String, Integer>("class02",101), new Tuple2<String, Integer>("class01",199), new Tuple2<String, Integer>("class02",121), new Tuple2<String, Integer>("class02",120)); JavaPairRDD<String, Integer> classRDD = jsc.parallelizePairs(lst); JavaPairRDD<String, Iterable<Integer>> groupedRDD = classRDD.groupByKey(); groupedRDD.foreach(new VoidFunction<Tuple2<String,Iterable<Integer>>>() { private static final long serialVersionUID = 1L; @Override public void call(Tuple2<String, Iterable<Integer>> tuple) throws Exception { String classKey = tuple._1; Iterator<Integer> values = tuple._2.iterator(); while (values.hasNext()) { Integer value = values.next(); System.out.println("key:" + classKey + "\t" + "value:" + value); } } }); } private static void reduceByKey(JavaSparkContext jsc) { @SuppressWarnings("unchecked") List<Tuple2<String, Integer>> lst = Arrays.asList( new Tuple2<String, Integer>("class01", 100), new Tuple2<String, Integer>("class02",101), new Tuple2<String, Integer>("class01",199), new Tuple2<String, Integer>("class02",121), new Tuple2<String, Integer>("class02",120)); JavaPairRDD<String, Integer> classRDD = jsc.parallelizePairs(lst); JavaPairRDD<String, Integer> resultRDD = classRDD.reduceByKey(new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); resultRDD.foreach(new VoidFunction<Tuple2<String,Integer>>() { private static final long serialVersionUID = 1L; @Override public void call(Tuple2<String, Integer> tuple) throws Exception { System.out.println("key:" + tuple._1 + "\t" + "value:" + tuple._2); } }); } // 把学生的成绩前3名取出来,并打印 // 1.先排序sortByKey,然后take(3),再foreach private static void sortByKey(JavaSparkContext jsc) { @SuppressWarnings("unchecked") List<Tuple2<String, Integer>> lst = Arrays.asList( new Tuple2<String, Integer>("tom", 60), new Tuple2<String, Integer>("kate",80), new Tuple2<String, Integer>("kobe",100), new Tuple2<String, Integer>("马蓉",4), new Tuple2<String, Integer>("宋哲",2), new Tuple2<String, Integer>("白百合",3), new Tuple2<String, Integer>("隔壁老王",1)); JavaPairRDD<String, Integer> classRDD = jsc.parallelizePairs(lst); JavaPairRDD<Integer, String> pairRDD = classRDD.mapToPair(new PairFunction<Tuple2<String,Integer>,Integer , String>() { private static final long serialVersionUID = 1L; @Override public Tuple2<Integer, String> call(Tuple2<String, Integer> tuple) throws Exception { return new Tuple2<Integer, String>(tuple._2, tuple._1); } }); //do no JavaPairRDD<Integer, String> sortedRDD = pairRDD.sortByKey(); JavaPairRDD<String, Integer> sortedRDD01 = sortedRDD.mapToPair(new PairFunction<Tuple2<Integer,String>, String, Integer>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(Tuple2<Integer, String> tuple) throws Exception { return new Tuple2<String, Integer>(tuple._2, tuple._1); } } ); // take 也是一个action操作 List<Tuple2<String, Integer>> result = sortedRDD01.take(3); System.out.println(result); } private static void join(JavaSparkContext jsc) { // 模拟数据 @SuppressWarnings("unchecked") List<Tuple2<Integer, String>> names =Arrays.asList( new Tuple2<Integer, String>(1,"jack"), new Tuple2<Integer, String>(2,"rose"), new Tuple2<Integer, String>(3,"tom"), new Tuple2<Integer, String>(4,"赵丽颖")); JavaPairRDD<Integer, String> num2NamesRDD = jsc.parallelizePairs(names); List<Tuple2<Integer, Integer>> scores = Arrays.asList( new Tuple2<Integer, Integer>(1,60), new Tuple2<Integer, Integer>(4,100), new Tuple2<Integer, Integer>(2,30)); JavaPairRDD<Integer, Integer> num2scoresRDD = jsc.parallelizePairs(scores); JavaPairRDD<Integer, Tuple2<Integer, String>> joinedRDD = num2scoresRDD.join(num2NamesRDD); //姓名成绩排序,取前2名 JavaPairRDD<Integer, String> score2NameRDD = joinedRDD.mapToPair(new PairFunction<Tuple2<Integer,Tuple2<Integer,String>>,Integer, String>() { private static final long serialVersionUID = 1L; @Override public Tuple2<Integer, String> call( Tuple2<Integer, Tuple2<Integer, String>> tuple) throws Exception { Integer score = tuple._2._1; String name = tuple._2._2; return new Tuple2<Integer, String>(score,name); } }); // sortByKey之后,你可以执行一个maptoPair的操作,转换为<name,score> System.out.println(score2NameRDD.sortByKey(false).take(2)); } // 学生成绩改良版 private static void leftOutJoin(JavaSparkContext jsc) { // 模拟数据 @SuppressWarnings("unchecked") List<Tuple2<Integer, String>> names =Arrays.asList( new Tuple2<Integer, String>(1,"jack"), new Tuple2<Integer, String>(2,"rose"), new Tuple2<Integer, String>(3,"tom"), new Tuple2<Integer, String>(4,"赵丽颖")); JavaPairRDD<Integer, String> num2NamesRDD = jsc.parallelizePairs(names); List<Tuple2<Integer, Integer>> scores = Arrays.asList( new Tuple2<Integer, Integer>(1,60), new Tuple2<Integer, Integer>(4,100), new Tuple2<Integer, Integer>(2,30)); JavaPairRDD<Integer, Integer> num2scoresRDD = jsc.parallelizePairs(scores); // num2scoresRDD num2NamesRDD //JavaPairRDD<Integer, Tuple2<Integer, Optional<String>>> joinedRDD = num2NamesRDD.leftOuterJoin(num2scoresRDD); // 注意join,谁join谁,没区别,但是leftoutjoin 是有顺序的 JavaPairRDD<Integer, Tuple2<String, Optional<Integer>>> joinedRDD = num2NamesRDD.leftOuterJoin(num2scoresRDD); JavaPairRDD<Integer, String> pairRDD = joinedRDD.mapToPair(new PairFunction<Tuple2<Integer,Tuple2<String,Optional<Integer>>>, Integer, String>() { private static final long serialVersionUID = 1L; @Override public Tuple2<Integer, String> call( Tuple2<Integer, Tuple2<String, Optional<Integer>>> tuple) throws Exception { String name = tuple._2._1; Optional<Integer> scoreOptional = tuple._2._2; Integer score = null; if(scoreOptional.isPresent()){ score= scoreOptional.get(); }else { score = 0; } return new Tuple2<Integer, String>(score, name); } }); JavaPairRDD<Integer, String> sortedRDD = pairRDD.sortByKey(false); sortedRDD.foreach(new VoidFunction<Tuple2<Integer,String>>() { private static final long serialVersionUID = 1L; @Override public void call(Tuple2<Integer, String> tuple) throws Exception { if(tuple._1 == 0){ System.out.println("name:" + tuple._2 + "\t" + "要努力了,你的成绩0分" ); }else{ System.out.println("姓名:" + tuple._2 + "\t" + "分数:" + tuple._1); } } }); } }
如有疑问可跟帖讨论。欢迎拍砖
本文出自 “星月情缘” 博客,请务必保留此出处http://xuegodxingyue.blog.51cto.com/5989753/1948664
以上是关于sparkRDD 算子的创建和使用的主要内容,如果未能解决你的问题,请参考以下文章
SparkRDD操作具体解释2——值型Transformation算子
pyspark3.0.0+spark3.0.0学习01(RDD的创建,RDD算子(常用Transformation算子 常用Action算子 分区操作算子)的学习及应用)