Spark的算子(函数)

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark的算子(函数)相关的知识,希望对你有一定的参考价值。

参考技术A

RDD中的所有转换都是延迟加载的,也就是说,它们并不会直接计算结果。

RDD通过 persist 方法或 cache 方法可以将前面的计算结果缓存,但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用。

通过查看源码发现cache终也是调用了persist方法,默认的存储级别都是仅在内存存储一份,Spark 的存储级别还有好多种,存储级别在object StorageLevel中定义的。

缓存有可能丢失,或者存储存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使 缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各 个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。
Demo示例:

容错机制

分别举例说明:
本地目录 注意:这种模式,需要将spark-shell运行在本地模式上

HDFS的目录 注意:这种模式,需要将spark-shell运行在集群模式上

源码中的一段话

Lineage(血统)

RDD的依赖关系

窄依赖指的是每一个父RDD的Partition多被子RDD的一个Partition使用

宽依赖指的是多个子RDD的Partition会依赖同一个父RDD的Partition

Spark任务中的Stage DAG(Directed Acyclic Graph)叫做有向无环图,原始的RDD通过一系列的转换就就形成了DAG,根据 RDD之间的依赖关系的不同将DAG划分成不同的Stage,对于窄依赖,partition的转换处理在Stage中完 成计算。对于宽依赖,由于有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计 算, 因此宽依赖是划分Stage的依据 。

把每个partition中的分区号和对应的值拿出来

接收一个函数参数:

创建一个函数返回RDD中的每个分区号和元素:

调用:

先对局部聚合,再对全局聚合

查看每个分区中的元素:

将每个分区中的大值求和,注意:初始值是0;

如果初始值时候10,则结果为:30

如果是求和,注意:初始值是0:

如果初始值是10,则结果是:45

一个字符串的例子:

修改一下刚才的查看分区元素的函数

两个分区中的元素:

运行结果:

更复杂一点的例子

结果可能是:”24”,也可能是:”42”

结果是:”10”,也可能是”01”

原因:注意有个初始值””,其长度0,然后0.toString变成字符串

结果是:”11”,原因同上。

准备数据:

两个分区中的元素:

示例:将每个分区中的动物多的个数求和

将每种动物个数求和

这个例子也可以使用:reduceByKey

都是将RDD中的分区进行重分区。 区别是:coalesce默认不会进行shuffle(false);而repartition会进行shuffle(true),即:会将数 据真正通过网络进行重分区。
示例:

下面两句话是等价的:

Spark算子篇 --Spark算子之aggregateByKey详解

一。基本介绍

rdd.aggregateByKey(3, seqFunc, combFunc) 其中第一个函数是初始值

3代表每次分完组之后的每个组的初始值。

seqFunc代表combine的聚合逻辑

每一个mapTask的结果的聚合成为combine

combFunc reduce端大聚合的逻辑

ps:aggregateByKey默认分组

二。代码

from pyspark import SparkConf,SparkContext
from __builtin__ import str
conf = SparkConf().setMaster("local").setAppName("AggregateByKey")
sc = SparkContext(conf = conf)

rdd = sc.parallelize([(1,1),(1,2),(2,1),(2,3),(2,4),(1,7)],2)

def f(index,items):
    print "partitionId:%d" %index
    for val in items:
        print val
    return items
    
rdd.mapPartitionsWithIndex(f, False).count()


def seqFunc(a,b):
    print "seqFunc:%s,%s" %(a,b)
    return max(a,b) #取最大值
def combFunc(a,b):
    print "combFunc:%s,%s" %(a ,b)
    return a + b #累加起来
‘‘‘
    aggregateByKey这个算子内部肯定有分组
‘‘‘
aggregateRDD = rdd.aggregateByKey(3, seqFunc, combFunc)
rest = aggregateRDD.collectAsMap()
for k,v in rest.items():
    print k,v

sc.stop()

 

三。详细逻辑

技术分享图片

PS:

seqFunc函数 combine篇。

3是每个分组的最大值,所以把3传进来,在combine函数中也就是seqFunc中第一次调用 3代表a,b即1,max(a,b)即3 第二次再调用则max(3.1)中的最大值3即输入值,2即b值 所以结果则为(1,3)

底下类似。combine函数调用的次数与分组内的数据个数一致。

 

combFunc函数 reduce聚合

在reduce端大聚合拉完数据后也是先分组,然后再调用combFunc函数

四。结果

技术分享图片

持续更新中。。。。,欢迎大家关注我的公众号LHWorld.

技术分享图片

 


以上是关于Spark的算子(函数)的主要内容,如果未能解决你的问题,请参考以下文章

Spark算子篇 --Spark算子之combineByKey详解

Spark算子篇 --Spark算子之aggregateByKey详解

Spark 算子

spark算子 分为3大类

[Spark精进]必须掌握的4个RDD算子之map算子

[Spark精进]必须掌握的4个RDD算子之map算子