Spark 算子之map使用
Posted 逆风飞翔的小叔
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark 算子之map使用相关的知识,希望对你有一定的参考价值。
前言
算子是spark中处理数据的重要的计算单元,RDD 根据数据处理方式的不同将算子整体上分为 Value 类型、双 Value 类型和 Key-Value类型;
Map使用
map在spark的数据转换中有着重要的作用,开发中几乎离不开map的使用,基本语法:
def map[U: ClassTag](f: T => U ): RDD[ U ]
函数说明:
- 将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换
案例1
自定义一个List集合,将集合中的每个元素前面拼接一个字符串
import org.apache.spark.SparkConf, SparkContext
import org.apache.spark.rdd.RDD
object MapRdd_1
def main(args: Array[String]): Unit =
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount");
// 创建 Spark 上下文环境对象(连接对象)
val sparkContext: SparkContext = new SparkContext(sparkConf);
val dataRDD: RDD[Int] = sparkContext.makeRDD(List(1, 2, 3, 4))
val dataRDD1: RDD[String] = dataRDD.map(
num =>
"hello : " + num
)
dataRDD1.collect().foreach(println)
sparkContext.stop()
mapPartitions
与map对应的,spark提供了mapPartitions这个算子,和map不同的是,这个mapPartitions带有分区的概念,
def mapPartitions[U: ClassTag]( f: Iterator[T] => Iterator[U] , preservesPartitioning: Boolean = false): RDD[U]函数说明:
- 将待处理的数据 以分区为单位 发送到计算节点进行处理,这里的处理是指可以进行任意的处 理,哪怕是过滤数据;
案例2:
获取每个分区的最大值import org.apache.spark.SparkConf, SparkContext
import org.apache.spark.rdd.RDD
object MapRdd_3
def main(args: Array[String]): Unit =
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount");
// 创建 Spark 上下文环境对象(连接对象)
val sparkContext: SparkContext = new SparkContext(sparkConf);
val dataRDD: RDD[Int] = sparkContext.makeRDD(List(1, 2, 3, 4),2)
//求每个分区的最大值
val result = dataRDD.mapPartitions(
iter =>
List(iter.max).iterator
)
result.collect().foreach(println)
sparkContext.stop()
map 和 mapPartitions 的区别?
1、数据处理角度
- Map 算子是分区内一个数据一个数据的执行,类似于串行操作。而 mapPartitions 算子是以分区为单位进行批处理操作;
2、功能的角度
- Map 算子主要目的将数据源中的数据进行转换和改变。但是不会减少或增多数据,MapPartitions 算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变,所以可以增加或减少数据
3、性能的角度
- Map 算子因为类似于串行操作,所以性能比较低,而是 mapPartitions 算子类似于批处理,所以性能较高。但是 mapPartitions 算子会长时间占用内存,那么这样会导致内存可能 不够用,出现内存溢出的错误。所以在内存有限的情况下,不推荐使用,而使用 map 操作;
mapPartitionsWithIndex
带有分区编号的map算子
def mapPartitionsWithIndex[U: ClassTag]( f: (Int, Iterator[T]) => Iterator[U] , preservesPartitioning: Boolean = false): RDD[U]
函数说明:
- 将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处 理,哪怕是过滤数据,在处理时同时可以获取当前分区索引;
案例3:
输出第一个分区的数据import org.apache.spark.SparkConf, SparkContext
object MapRdd_4
def main(args: Array[String]): Unit =
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4), 2)
// 【1,2】,【3,4】
val mpiRDD = rdd.mapPartitionsWithIndex(
(index, iter) =>
if ( index == 1 )
iter
else
Nil.iterator
)
mpiRDD.collect().foreach(println)
sc.stop()
flatMap
将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射def flatMap[U: ClassTag](f: T => TraversableOnce[U] ): RDD[U]
案例4:
将一个集合中带有另外两个集合类型的元素打散,以单个元素的方式输出import org.apache.spark.SparkConf, SparkContext
object MapRdd_6
def main(args: Array[String]): Unit =
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(
List(1,2),List(3,4)
))
val resultRDD = rdd.flatMap(
list =>
list
)
resultRDD.collect().foreach(println)
println("===============")
val rddWords = sc.makeRDD(List(
"hello word","hello spark"
))
val wordResult = rddWords.flatMap(
s =>
s.split(" ")
)
wordResult.collect().foreach(println)
sc.stop()
案例5:
将一个集合中带有另外两个集合类型的元素,以及一个单个元素扁平化,以单个元素的方式输出import org.apache.spark.SparkConf, SparkContext
object MapRdd_7
def main(args: Array[String]): Unit =
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(
List(1,2),3,List(3,4)
))
val resultRDD = rdd.flatMap(
data =>
data match
case list : List[_] => list
case ele => List(ele)
)
resultRDD.collect().foreach(println)
sc.stop()
以上是关于Spark 算子之map使用的主要内容,如果未能解决你的问题,请参考以下文章
[Spark精进]必须掌握的4个RDD算子之filter算子
[Spark精进]必须掌握的4个RDD算子之filter算子