Spark常用的Transformation算子的简单例子

Posted 大冰的小屋

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark常用的Transformation算子的简单例子相关的知识,希望对你有一定的参考价值。

Spark的常用Transformation算子有map、filter、flatMap、reduceByKey、groupByKey、join、leftOuterJoin、rightOuterJoin、cogroup等算子。在这里进行的简历例子的使用,以后随着学习的深入需要第每个算子深入分析。

package com.spark.App

import org.apache.spark.SparkContext, SparkConf

/**
  * Created by Administrator on 2016/8/13 0013.
  */
object Transformations 
  def main(args: Array[String]) 
    val conf = new SparkConf().setAppName("Transformations").setMaster("local")
    val sc = new SparkContext(conf)

//    mapTransformation(sc)
//    filterTransformation(sc)
//    flatMapTransformation(sc)
//    reduceByKeyTransformation(sc)
//    groupByKeyTransformation(sc)
//    joinTransformation(sc)
//    leftOuterJoinTransformation(sc)
//    rightOuterJoinTransformation(sc)
    cogroupTransformation(sc)

    sc.stop()
  


  /**
    * 将函数应用于RDD的每个元素,将返回值构成新的RDD
    * @param sc
    */
  def mapTransformation(sc: SparkContext): Unit = 
    val numbers = sc.parallelize(1 to 10)   // 根据集合构建RDD
    val mapped = numbers.map(item => 2 * item)  // 每个元素扩大2倍
    mapped.collect.foreach(println)
  

  /**
    * 返回一个由通过filter()的函数的元素组成的RDD,结果为true的元素会返回
    * @param sc
    */
  def filterTransformation(sc: SparkContext): Unit = 
    val numbers = sc.parallelize(1 to 10)
    val filtered = numbers.filter(item => item % 2 == 0)
    filtered.collect.foreach(println)
  


  /**
    * 将函数应用于RDD中的每个元素,将返回的迭代器的所有内容构成新的RDD。通常用来切分单词
    * @param sc
    */
  def flatMapTransformation(sc: SparkContext): Unit = 
    val lines = Array("What makes life dreary is the want of motive", "Hello Spark", "Hello World")
    val linesRDD = sc.parallelize(lines)
    val words = linesRDD.flatMap(line => line.split(" "))
    words.collect.foreach(println)
  

  /**
    * reduceByKey必须用于元素是(key value) 的元素,把key相同的元素进行merge操作
    * 它能够在本地先进行merge操作,并且merge操作可以通过函数自定义
    * @param sc
    */
  def reduceByKeyTransformation(sc: SparkContext): Unit = 
    val lines = Array("What makes life dreary is the want of motive", "Hello Spark", "Hello World")
    val linesRDD = sc.parallelize(lines)
    val words = linesRDD.flatMap(_.split(" ")).map(word => (word, 1))
    val wordsCount = words.reduceByKey(_ + _)
    wordsCount.collect.foreach(println)
  

  /**
    * groupByKey也是对每个key进行合并操作,但只生成一个sequence.
    * groupByKey本身不能自定义操作函数.
    * @param sc
    */
  def groupByKeyTransformation(sc: SparkContext): Unit = 
    val data = Array(Tuple2("David", "Math"), Tuple2("David", "Music"),
                      Tuple2("Mary", "Math"), Tuple2("Mary", "Art"),
                      Tuple2("Allin", "Computer"))
    val dataRDD = sc.parallelize(data)
    val grouped = dataRDD.groupByKey()
    grouped.collect.foreach(println)
  

  /**
    * 对两个RDD进行内链接。
    * 只有两个RDD中都有的key才会输出
    * @param sc
    */
  def joinTransformation(sc: SparkContext): Unit = 
    val idAndName = Array(Tuple2(1, "David"), Tuple2(2, "Mary"), Tuple2(3, "Allin"))
    val idAndScore = Array(Tuple2(1, 98), Tuple2(2, 90), Tuple2(3, 86))

    val names = sc.parallelize(idAndName);
    val scores = sc.parallelize(idAndScore)
    val nameAndScore = names.join(scores)
    nameAndScore.collect.foreach(println)
  

  /**
    * 对两个RDD进行连接操作,确保第一个RDD的键必须存在
    * 连接后,缺省的val为None,这里的结果为:
    * (1,(David,Some(98)))
    * (3,(Allin,None))
    * (2,(Mary,Some(90)))
    * @param sc
    */
  def leftOuterJoinTransformation(sc: SparkContext): Unit = 
    val idAndName = Array(Tuple2(1, "David"), Tuple2(2, "Mary"), Tuple2(3, "Allin"))
    val idAndScore = Array(Tuple2(1, 98), Tuple2(2, 90))

    val names = sc.parallelize(idAndName);
    val scores = sc.parallelize(idAndScore)
    val nameAndScore = names.leftOuterJoin(scores)
    nameAndScore.collect.foreach(println)
  

  /**
    * 对两个RDD进行连接操作,确保第二个RDD的键必须存在,与leftOuterJoin正好相反
    * 输出结果:
    * (1,(Some(David),98))
    * (3,(None,86))
    * (2,(Some(Mary),90))
    * @param sc
    */
  def rightOuterJoinTransformation(sc: SparkContext): Unit = 
    val idAndName = Array(Tuple2(1, "David"), Tuple2(2, "Mary"))
    val idAndScore = Array(Tuple2(1, 98), Tuple2(2, 90), Tuple2(3, 86))

    val names = sc.parallelize(idAndName);
    val scores = sc.parallelize(idAndScore)
    val nameAndScore = names.rightOuterJoin(scores)
    nameAndScore.collect.foreach(println)
  

  /**
    * 将两个RDD中用有相同键的数据分组到一起
    * 输出结果:
    * (1,(CompactBuffer(David, Frank),CompactBuffer(98, 93)))
    * (3,(CompactBuffer(Allin, Carry),CompactBuffer(86, 83)))
    * (2,(CompactBuffer(Mary, Duncan),CompactBuffer(90, 80)))
    * @param sc
    */
  def cogroupTransformation(sc: SparkContext): Unit = 
    val idAndName = Array(Tuple2(1, "David"), Tuple2(1, "Frank"),
                          Tuple2(2, "Mary"), Tuple2(2, "Duncan"),
                          Tuple2(3, "Allin"), Tuple2(3, "Carry"))
    val idAndScore = Array(Tuple2(1, 98), Tuple2(1, 93),
                            Tuple2(2, 90), Tuple2(2, 80),
                            Tuple2(3, 86), Tuple2(3, 83))

    val names = sc.parallelize(idAndName);
    val scores = sc.parallelize(idAndScore)
    val nameAndScore = names.cogroup(scores)
    nameAndScore.collect.foreach(println)
  

以上是关于Spark常用的Transformation算子的简单例子的主要内容,如果未能解决你的问题,请参考以下文章

pyspark3.0.0+spark3.0.0学习01(RDD的创建,RDD算子(常用Transformation算子 常用Action算子 分区操作算子)的学习及应用)

Spark算子系列第0篇:spark常用算子详解

Spark算子系列第0篇:spark常用算子详解

Spark-core算子

Spark中的各种action算子操作(java版)

Spark Transformation 算子