Spark之RDD算子-转换算子

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark之RDD算子-转换算子相关的知识,希望对你有一定的参考价值。

参考技术A

转换(Transformation)算子 就是对RDD进行操作的接口函数,其作用是将一个或多个RDD变换成新的RDD。

使用Spark进行数据计算,在利用创建算子生成RDD后,数据处理的算法设计和程序编写的最关键部分,就是利用变换算子对原始数据产生的RDD进行一步一步的变换,最终得到期望的计算结果。

对于变换算子可理解为分两类:1,对Value型RDD进行变换的算子;2,对Key/Value型RDD进行变换算子。在每个变换中有仅对一个RDD进行变换的,也有是对两个RDD进行变换的。

将当前RDD进行重新分区,生成一个以numPartitions参数指定的分区数存储的新RDD。参数shuffle为true时在变换过程中进行shuffle操作,否则不进行shuffle。

在Linux系统中,有许多对数据进行处理的shell命令,我们可能通过pipe变换将一些shell命令用于Spark中生成新的RDD。

对原RDD中的元素按照函数f指定的规则进行排序,并可通过ascending参数进行升序或降序设置,排序后的结果生成新的RDD,新的RDD的分区数量可以由参数numPartitions指定,默认与原RDD相同的分区数。

输入参数为另一个RDD,返回两个RDD中所有元素的笛卡尔积。

输入参数为另一个RDD,返回原始RDD与输入参数RDD的补集,即生成由原始RDD中而不在输入参数RDD中的元素构成新的RDD,参数numPartitions指定新RDD分区数。

返回原始RDD与另一个RDD的并集。

生成由原始RDD的值为Key,另一个RDD的值为Value依次配对构成的所有Key/Value对,并返回这些Key/Value对集合构成的新RDD

将Key/Value型RDD中的元素的Key提取出来,所有Key值构成一个序列形成新的RDD。

将Key/Value型RDD中的元素的Value值使用输入参数函数f进行变换构成一个新的RDD。

[Spark精进]必须掌握的4个RDD算子之mapPartitions算子

返回第一章

第二个mapPartitions:以数据分区为粒度的数据转换

按照介绍算子的惯例,我们还是先来说说 mapPartitions 的用法。mapPartitions,顾名思义,就是以数据分区为粒度,使用映射函数 f 对 RDD 进行数据转换。对于上述单词哈希值计数的例子,我们结合后面的代码,来看看如何使用 mapPartitions 来改善执行性能:


// 把普通RDD转换为Paired RDD
 
import java.security.MessageDigest
 
val cleanWordRDD: RDD[String] = _ // 请参考第一讲获取完整代码
 
val kvRDD: RDD[(String, Int)] = cleanWordRDD.mapPartitions( partition => {
  // 注意!这里是以数据分区为粒度,获取MD5对象实例
  val md5 = MessageDigest.getInstance("MD5")
  val newPartition = partition.map( word => {
  // 在处理每一条数据记录的时候,可以复用同一个Partition内的MD5对象
    md5.digest(word.getBytes).mkString
  })
  newPartition
})

可以看到,在上面的改进代码中,mapPartitions 以数据分区(匿名函数的形参 partition)为粒度,对 RDD 进行数据转换。具体的数据处理逻辑,则由代表数据分区的形参 partition 进一步调用 map(f) 来完成。你可能会说:“partition. map(f) 仍然是以元素为粒度做映射呀!这和前一个版本的实现,有什么本质上的区别呢?”
仔细观察,你就会发现,相比前一个版本,我们把实例化 MD5 对象的语句挪到了 map 算子之外。如此一来,以数据分区为单位,实例化对象的操作只需要执行一次,而同一个数据分区中所有的数据记录,都可以共享该 MD5 对象,从而完成单词到哈希值的转换。
通过下图的直观对比,你会发现,以数据分区为单位,mapPartitions 只需实例化一次 MD5 对象,而 map 算子却需要实例化多次,具体的次数则由分区内数据记录的数量来决定。


对于一个有着上百万条记录的 RDD 来说,其数据分区的划分往往是在百这个量级,因此,相比 map 算子,mapPartitions 可以显著降低对象实例化的计算开销,这对于 Spark 作业端到端的执行性能来说,无疑是非常友好的。
实际上。除了计算哈希值以外,对于数据记录来说,凡是可以共享的操作,都可以用 mapPartitions 算子进行优化。这样的共享操作还有很多,比如创建用于连接远端数据库的 Connections 对象,或是用于连接 Amazon S3 的文件系统句柄,再比如用于在线推理的机器学习模型,等等,不一而足。你不妨结合实际工作场景,把你遇到的共享操作整理到留言区,期待你的分享。
相比 mapPartitions,mapPartitionsWithIndex 仅仅多出了一个数据分区索引,这个数据分区索引可以为我们获取分区编号,当你的业务逻辑中需要使用到分区编号的时候,不妨考虑使用这个算子来实现代码。除了这个额外的分区索引以外,mapPartitionsWithIndex 在其他方面与 mapPartitions 是完全一样的。
介绍完 map 与 mapPartitions 算子之后,接下来,我们趁热打铁,再来看一个与这两者功能类似的算子:flatMap。

点击跳转到下一讲

以上是关于Spark之RDD算子-转换算子的主要内容,如果未能解决你的问题,请参考以下文章

[Spark精进]必须掌握的4个RDD算子之map算子

[Spark精进]必须掌握的4个RDD算子之map算子

[Spark精进]必须掌握的4个RDD算子之mapPartitions算子

[Spark精进]必须掌握的4个RDD算子之mapPartitions算子

Spark算子

spark算子介绍