Spark Transformation 算子

Posted huanfion

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark Transformation 算子相关的知识,希望对你有一定的参考价值。

Java版
package com.huanfion.Spark;
 
import com.sun.tools.internal.ws.processor.model.java.JavaParameter;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
 
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
 
public class TransformationJava {
    public static void main(String[] args) {
        //map();
//        filter();
        //flatMap();
//        groupByKey();
//        reduceByKey();
//        sortByKey();
//        join();
        cogroup();
    }
    public static void cogroup()
    {
        List stulist=Arrays.asList(
                new Tuple2<>(1,"liuda"),
                new Tuple2<>(2,"lier"),
                new Tuple2<>(3,"zhangsan"),
                new Tuple2<>(4,"gousi"),
                new Tuple2<>(5,"lily"),
                new Tuple2<>(6,"lucy"));
        List scorelist=Arrays.asList(
                new Tuple2<>(1,88),
                new Tuple2<>(2,87),
                new Tuple2<>(2,88),
                new Tuple2<>(2,97),
                new Tuple2<>(3,90),
                new Tuple2<>(3,50),
                new Tuple2<>(4,100),
                new Tuple2<>(5,58),
                new Tuple2<>(6,65));
        JavaSparkContext sc=getsc();
        JavaPairRDD sturdd=sc.parallelizePairs(stulist);
        JavaPairRDD scorerdd=sc.parallelizePairs(scorelist);
 
        JavaPairRDD<Integer,Tuple2<Iterable,Iterable>> cogroupvalue=sturdd.cogroup(scorerdd);
 
        cogroupvalue.foreach(new VoidFunction<Tuple2<Integer, Tuple2<Iterable, Iterable>>>() {
            @Override
            public void call(Tuple2<Integer, Tuple2<Iterable, Iterable>> integerTuple2Tuple2) throws Exception {
                System.out.println(integerTuple2Tuple2._1);
                System.out.println(integerTuple2Tuple2._2._1+":"+integerTuple2Tuple2._2._2);
            }
        });
    }
    public static void join()
    {
        List stulist=Arrays.asList(
                new Tuple2<>(1,"liuda"),
                new Tuple2<>(2,"lier"),
                new Tuple2<>(3,"zhangsan"),
                new Tuple2<>(4,"gousi"),
                new Tuple2<>(5,"lily"),
                new Tuple2<>(6,"lucy"));
        List scorelist=Arrays.asList(
                new Tuple2<>(1,88),
                new Tuple2<>(2,87),
                new Tuple2<>(3,90),
                new Tuple2<>(4,100),
                new Tuple2<>(5,58),
                new Tuple2<>(6,65));
        JavaSparkContext sc=getsc();
        JavaPairRDD sturdd=sc.parallelizePairs(stulist);
        JavaPairRDD scorerdd=sc.parallelizePairs(scorelist);
 
        JavaPairRDD<Integer,Tuple2<String,Integer>> joinvalue=sturdd.join(scorerdd);
 
        joinvalue.foreach(new VoidFunction<Tuple2<Integer, Tuple2<String, Integer>>>() {
            @Override
            public void call(Tuple2<Integer, Tuple2<String, Integer>> integerTuple2Tuple2) throws Exception {
                System.out.println(integerTuple2Tuple2._1);
                System.out.println(integerTuple2Tuple2._2._1+":"+integerTuple2Tuple2._2._2);
            }
        });
    }
    public static void sortByKey(){
        List list=Arrays.asList(
                new Tuple2<>(91,"liuda"),
                new Tuple2<>(78,"lier"),
                new Tuple2<>(99,"zhangsan"),
                new Tuple2<>(76,"gousi"),
                new Tuple2<>(90,"lily"),
                new Tuple2<>(89,"lucy"));
        JavaPairRDD rdd=getsc().parallelizePairs(list);
        JavaPairRDD<Integer,String> sortvalue=rdd.sortByKey(false);
        sortvalue.foreach(x->System.out.println(x._1+"--"+x._2));
    }
    public static void reduceByKey(){
        List list=Arrays.asList(
                new Tuple2<>("class_1",91),
                new Tuple2<>("class_2",78),
                new Tuple2<>("class_1",99),
                new Tuple2<>("class_2",76),
                new Tuple2<>("class_2",90),
                new Tuple2<>("class_1",86));
        JavaPairRDD rdd=getsc().parallelizePairs(list);
        JavaPairRDD<String,Iterable> reducevalues=rdd.reduceByKey(new Function2<Integer, Integer,Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1+v2;
            }
        });
        reducevalues.foreach(x->System.out.println(x._1+"--"+x._2));
    }
    public static void groupByKey(){
        List list=Arrays.asList(
                new Tuple2<>("class_1",90),
                new Tuple2<>("class_2",78),
                new Tuple2<>("class_1",99),
                new Tuple2<>("class_2",76),
                new Tuple2<>("class_2",90),
                new Tuple2<>("class_1",86));
        JavaPairRDD rdd=getsc().parallelizePairs(list);
 
        JavaPairRDD<String,Iterable> groupvalue=rdd.groupByKey();
        groupvalue.foreach(x->System.out.println(x._1+"--"+x._2));
    }
    public static void flatMap(){
        List list=Arrays.asList("Hadoop Hive","Hadoop Hbase");
        JavaRDD rdd=getsc().parallelize(list);
        JavaRDD flatMapValue=rdd.flatMap(new FlatMapFunction<String,String>() {
            @Override
            public Iterator call(String value) throws Exception {
                return Arrays.asList(value.split(" ")).iterator();
            }
        });
        flatMapValue.foreach(x->System.out.println(x));
    }
    public static void map(){
        JavaSparkContext sc=getsc();
 
        List list= Arrays.asList(1,2,3,4);
 
        JavaRDD rdd=sc.parallelize(list);
 
        JavaRDD count=rdd.map(new Function<Integer,Integer>() {
            @Override
            public Integer call(Integer value) throws Exception {
                return value * 10;
            }
        });
        count.foreach(x->System.out.println(x));
    }
 
    public static void filter(){
        JavaSparkContext sc=getsc();
 
        List list= Arrays.asList(1,2,3,4,5,6,7,8,9,10);
 
        JavaRDD rdd=sc.parallelize(list);
 
        JavaRDD filterValue=rdd.filter(x->(Integer) x%2==0);
 
        filterValue.foreach(x->System.out.println(x));
    }
 
    public static JavaSparkContext getsc()
    {
        SparkConf sparkconf=new SparkConf().setAppName("Transformation").setMaster("local");
 
        JavaSparkContext sc=new JavaSparkContext(sparkconf);
 
        return sc;
    }
}

 

 
 
Scala版本
package com.huanfion.Spark
 
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
 
object TransformationScala {
  def getsc():SparkContext={
    val sparkconf=new SparkConf().setAppName("Transformation").setMaster("local")
 
    val sc = new SparkContext(sparkconf)
 
    sc
  }
  def main(args: Array[String]): Unit = {
    //filter()
    //flatMap()
//    groupByKey()
//    reduceByKey()
//    sortByKey()
    join()
//    cogroup()
  }
 
  def filter():Unit={
   val sc=getsc()
 
    val list= Array(1,2,3,4,5,6,7,8,9,10)
 
    val rdd=sc.parallelize(list)
 
    val count=rdd.filter(x=>x%2==0)
 
    count.foreach(x=>System.out.println(x))
  }
  def map():Unit={
    val sparkconf=new SparkConf().setAppName("Transformation").setMaster("local")
 
    val sc = new SparkContext(sparkconf)
 
    val list= Array(1,2,3,4,5)
 
    val rdd=sc.parallelize(list)
 
    val count=rdd.map(x=>x*10)
 
    count.foreach(x=>System.out.println(x))
  }
 
  def flatMap():Unit={
    val list=Array("Hadoop Hive", "Hadoop Hbase")
 
    val rdd=getsc().parallelize(list)
 
    val flatmapvalue=rdd.flatMap(x=>x.split(" "))
 
    flatmapvalue.foreach(x=>System.out.println(x))
  }
 
  def groupByKey()= {
    val list=Array(
        Tuple2("class_1",90),
       Tuple2("class_2",78),
       Tuple2("class_1",99),
       Tuple2("class_2",76),
       Tuple2("class_2",90),
       Tuple2("class_1",86))
    val rdd=getsc().parallelize(list)
 
    val groupvalue=rdd.groupByKey()
 
    groupvalue.foreach(x=>{
      System.out.println(x._1)
      x._2.foreach(y=>System.out.println(y))
    })
  }
 
  def reduceByKey()={
    val list=Array(
      Tuple2("class_1",90),
      Tuple2("class_2",78),
      Tuple2("class_1",99),
      Tuple2("class_2",76),
      Tuple2("class_2",90),
      Tuple2("class_1",86))
    val rdd=getsc().parallelize(list)
 
    val reducevalue=rdd.reduceByKey(_+_)
 
    reducevalue.foreach(x=>System.out.println(x._1+"--"+x._2))
  }
 
  def sortByKey()={
    val list=Array(
      Tuple2("liuda",90),
      Tuple2("lier",78),
      Tuple2("zhangsan",99),
      Tuple2("gousi",76),
      Tuple2("lily",90),
      Tuple2("lucy",86))
    val rdd=getsc().parallelize(list)
 
    val sortvalue=rdd.sortBy(x=>x._2,false)
 
    sortvalue.foreach(x=>System.out.println(x._1+":"+x._2))
  }
  def join()={
    val stulist=Array(
       Tuple2(1,"liuda"),
       Tuple2(2,"lier"),
       Tuple2(3,"zhangsan"),
       Tuple2(4,"gousi"),
       Tuple2(5,"lily"),
       Tuple2(6,"lucy"));
    val scorelist=Array(
       Tuple2(1,88),
       Tuple2(2,87),
       Tuple2(3,90),
       Tuple2(4,100),
       Tuple2(5,58),
       Tuple2(6,65));
    val sc=getsc();
    val sturdd=sc.parallelize(stulist)
 
    val scorerdd=sc.parallelize(scorelist)
 
    val joinvalue=sturdd.join(scorerdd)
 
    joinvalue.foreach(x=>System.out.println(x._1+"->"+x._2))
  }
 
  def cogroup()={
    val stulist=Array(
      Tuple2(1,"liuda"),
      Tuple2(2,"lier"),
      Tuple2(3,"zhangsan"),
      Tuple2(4,"gousi"),
      Tuple2(5,"lily"),
      Tuple2(6,"lucy"));
    val scorelist=Array(
      Tuple2(1,88),
      Tuple2(2,87),
      Tuple2(2,84),
      Tuple2(2,86),
      Tuple2(3,90),
      Tuple2(3,65),
      Tuple2(4,100),
      Tuple2(5,58),
      Tuple2(6,65));
    val sc=getsc();
    val sturdd=sc.parallelize(stulist)
 
    val scorerdd=sc.parallelize(scorelist)
 
    val joinvalue=sturdd.cogroup(scorerdd)
 
    joinvalue.foreach(x=>System.out.println(x._1+"->"+x._2._1.toList+":"+x._2._2.toList))
  }
}

 

 

以上是关于Spark Transformation 算子的主要内容,如果未能解决你的问题,请参考以下文章

Spark中的各种action算子操作(java版)

spark记录spark算子之Transformation

Spark Transformation和Action算子速查表

Spark常用的transformation算子

Spark的transformation和action算子简介

Spark transformation算子之coalesce&&repartition