Spark 算子之map使用

Posted 逆风飞翔的小叔

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark 算子之map使用相关的知识,希望对你有一定的参考价值。

前言


算子是spark中处理数据的重要的计算单元,RDD 根据数据处理方式的不同将算子整体上分为 Value 类型、双 Value 类型和 Key-Value类型;

Map使用

map在spark的数据转换中有着重要的作用,开发中几乎离不开map的使用,基本语法:

def map[U: ClassTag](f: T => U ): RDD[ U ]

函数说明:

  • 将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换

案例1

自定义一个List集合,将集合中的每个元素前面拼接一个字符串

import org.apache.spark.SparkConf, SparkContext
import org.apache.spark.rdd.RDD

object MapRdd_1 

  def main(args: Array[String]): Unit = 

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount");
    // 创建 Spark 上下文环境对象(连接对象)
    val sparkContext: SparkContext = new SparkContext(sparkConf);

    val dataRDD: RDD[Int] = sparkContext.makeRDD(List(1, 2, 3, 4))

    val dataRDD1: RDD[String] = dataRDD.map(
      num => 
        "hello : " + num
      
    )
    dataRDD1.collect().foreach(println)
    sparkContext.stop()

  

mapPartitions

与map对应的,spark提供了mapPartitions这个算子,和map不同的是,这个mapPartitions带有分区的概念,

def mapPartitions[U: ClassTag]( f: Iterator[T] => Iterator[U] , preservesPartitioning: Boolean = false): RDD[U]

函数说明:
  • 将待处理的数据 以分区为单位 发送到计算节点进行处理,这里的处理是指可以进行任意的处 理,哪怕是过滤数据;

案例2:

获取每个分区的最大值
import org.apache.spark.SparkConf, SparkContext
import org.apache.spark.rdd.RDD

object MapRdd_3 

  def main(args: Array[String]): Unit = 

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount");
    // 创建 Spark 上下文环境对象(连接对象)
    val sparkContext: SparkContext = new SparkContext(sparkConf);

    val dataRDD: RDD[Int] = sparkContext.makeRDD(List(1, 2, 3, 4),2)

    //求每个分区的最大值
    val result = dataRDD.mapPartitions(
      iter =>
        List(iter.max).iterator
      
    )
    result.collect().foreach(println)
    sparkContext.stop()

  

map mapPartitions 的区别

1、数据处理角度

  • Map 算子是分区内一个数据一个数据的执行,类似于串行操作。而 mapPartitions 算子是以分区为单位进行批处理操作;

2、功能的角度

  • Map 算子主要目的将数据源中的数据进行转换和改变。但是不会减少或增多数据,MapPartitions 算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变,所以可以增加或减少数据

3、性能的角度

  • Map 算子因为类似于串行操作,所以性能比较低,而是 mapPartitions 算子类似于批处理,所以性能较高。但是 mapPartitions 算子会长时间占用内存,那么这样会导致内存可能 不够用,出现内存溢出的错误。所以在内存有限的情况下,不推荐使用,而使用 map 操作;

mapPartitionsWithIndex

带有分区编号的map算子

def mapPartitionsWithIndex[U: ClassTag]( f: (Int, Iterator[T]) => Iterator[U] , preservesPartitioning: Boolean = false): RDD[U]

函数说明:

  • 将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处 理,哪怕是过滤数据,在处理时同时可以获取当前分区索引;

案例3:

输出第一个分区的数据
import org.apache.spark.SparkConf, SparkContext

object MapRdd_4 

  def main(args: Array[String]): Unit = 

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
    val rdd = sc.makeRDD(List(1,2,3,4), 2)
    // 【1,2】,【3,4】
    val mpiRDD = rdd.mapPartitionsWithIndex(
      (index, iter) => 
        if ( index == 1 ) 
          iter
         else 
          Nil.iterator
        
      
    )
    mpiRDD.collect().foreach(println)
    sc.stop()
  


flatMap

将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射
def flatMap[U: ClassTag](f: T => TraversableOnce[U] ): RDD[U]

案例4:

将一个集合中带有另外两个集合类型的元素打散,以单个元素的方式输出
import org.apache.spark.SparkConf, SparkContext

object MapRdd_6 

  def main(args: Array[String]): Unit = 
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    val rdd = sc.makeRDD(List(
      List(1,2),List(3,4)
    ))
    val resultRDD = rdd.flatMap(
     list =>
       list
     
    )
    resultRDD.collect().foreach(println)

    println("===============")

    val rddWords = sc.makeRDD(List(
      "hello word","hello spark"
    ))

    val wordResult = rddWords.flatMap(
      s =>
        s.split(" ")
      
    )
    wordResult.collect().foreach(println)

    sc.stop()
  


案例5:

将一个集合中带有另外两个集合类型的元素,以及一个单个元素扁平化,以单个元素的方式输出

import org.apache.spark.SparkConf, SparkContext

object MapRdd_7 

  def main(args: Array[String]): Unit = 
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    val rdd = sc.makeRDD(List(
      List(1,2),3,List(3,4)
    ))

    val resultRDD = rdd.flatMap(
      data =>
        data match 
          case list : List[_] => list
          case ele => List(ele)
        
      
    )
    resultRDD.collect().foreach(println)
    sc.stop()
  


以上是关于Spark 算子之map使用的主要内容,如果未能解决你的问题,请参考以下文章

[Spark精进]必须掌握的4个RDD算子之map算子

spark记录spark算子之Transformation

[Spark精进]必须掌握的4个RDD算子之filter算子

[Spark精进]必须掌握的4个RDD算子之filter算子

[Spark精进]必须掌握的4个RDD算子之flatMap算子

[Spark精进]必须掌握的4个RDD算子之flatMap算子