spark知识点_RDD

Posted 小熊_看看

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark知识点_RDD相关的知识,希望对你有一定的参考价值。

  来自官网的Spark Programming Guide,包括个人理解的东西。

  RDD是spark中最重要的抽象概念(数据结构),是集群中各节点上并行处理的分隔元素的集合(汇总),总会用到collect()方法。

  RDD可以从Hadoop文件系统中的文件创建,也可以从执行程序中的Scala集合中创建或转换。spark可以在内存中留存一份RDD,方便在并行运算中高效重用。

  还有个抽象概念,共享变量。spark在不同的节点并行执行任务集时,需要把每个变量的副本传送一份到每个任务中,有时候变量需要在任务中共享。

  共享变量有两种:广播变量(Broadcast Variables)和累加器(Accumulators)。前者缓存在所有节点的内存中,后者用来叠加计数或求和。

  Spark2.2.0可以使用标准的CPython接口,故C库如Numpy可以使用,Pandas亦可。

  1)spark程序的第一件事是创建一个spark上下文对象,其中,先要配置自己的应用信息。

from pyspark import SparkContext, SparkConf

conf = SparkConf().setAppName(myFirstAPP).setMaster(local[*])
sc = SparkContext(conf=conf)

data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)  
#SparkContext.parallelize()用于将本地Python集合分布式处理为RDD格式,以便并行处理。可以设置分隔的数量,如
sc.parallelize(data,6)
#即,要想并行处理,数据必须要是RDD或DataSets或DataFrame格式。数据转换成这些格式后,就可以使用C库包来进行其他运算操作。

  2)外部文件,spark支持文本文件、序列文件及其他Hadoop输入格式。

distFile = sc.textFile("data.txt")  #文本文件,以行集合的格式读取
distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b) #textFile可使用DataSets的操作

  3)RDD操作:两种操作Transformation(从现存数据集中创建新的数据集DataSets)和Action(执行运算后将值返回给执行程序)。比如,map是transformation,reduce是action。

  所有的transformation都是‘懒的’,只记忆并不执行,只有当action需要返回值给执行程序时才执行计算,这样spark可以更高效。这样只会返回reduce结果,而没有庞大的map数据集。

  但是,如果有多个reduce,那么每次都要重新map,解决方法是:可以通过persist (or cache)方法将RDD留存在内存中。

  

lines = sc.textFile("data.txt")   #此处只创建一个指针
lineLengths = lines.map(lambda s: len(s))    #此处未计算
lineLengths.persist()  #留存,以重用
totalLength = lineLengths.reduce(lambda a, b: a + b)   #此处开始计算,只返回计算结果。任务在多台机器上运行,每台机器只负责自己的map部分及本地reduce,并返回自己的值给执行程序。

  4)传递函数给spark:lambda表达式(不支持多语句,且要求有返回值),本地自定义def(适用于长代码),模块的Top-level函数。

"""MyScript.py"""
if __name__ == "__main__":
    def myFunc(s):
        words = s.split(" ")
        return len(words)   #分隔后返回长度。s是文件数据,下面的textFile是RDD
#关于if __name__=="__main__"这种写法的用处,前面必然定义了一些函数,那么只在本程序中执行时运行该段代码,载入到其他程序时,就可以只用所定义的函数,而不会执行该段代码

    sc = SparkContext(...)
    sc.textFile("file.txt").map(myFunc)

  注意:如果创建新的MyClass并调用doStuff()时,需要调用self.field,这样就需要把整个对象传送到集群中。把field复制到本地变量中可避免该情况。

class MyClass(object):
    def __init__(self):
        self.field = "Hello"
    def doStuff(self, rdd):
        return rdd.map(lambda s: self.field + s) 
def doStuff(self, rdd):   #复制field到本地变量
    field = self.field
    return rdd.map(lambda s: field + s)

  5)理解闭包:全局变量需要聚合时,建议使用Accumulator(累加器)。

counter = 0
rdd = sc.parallelize(data)

# Wrong: Don‘t do this!!
def increment_counter(x):
    global counter
    counter += x
rdd.foreach(increment_counter)

print("Counter value: ", counter)

  本地模式(使用相同的JVM)时可能可以执行,但集群模式就不会如预期般执行。执行之前,spark会计算任务的(序列化)闭包(对每个执行器都可见的变量或方法),但counter变量传递给执行器的是副本(copies),当foreach方法引用counter时,这已经不是执行节点的counter,而是工作节点的counter,那么最终counter可能还是0。

  执行节点(driver node)执行程序存在的地方,工作节点(work node)把任务分发到集群中的地方。

  此外,想要使用rdd.foreach(println) 或rdd.map(println)打印时,并不能实现预期效果。因为闭包模式中,stdout在工作节点的执行器中,并不在执行节点,故需要先使用collect()将所有元素汇总到执行节点。但把所有元素汇总到一台机器上可能会内存溢出,解决方法是take()rdd.take(100).foreach(println),只打印部分元素。

  6)用键值对进行操作:reduceByKey,sortByKey。键值对可使用Python內建的tuple轻松获得。

lines = sc.textFile("data.txt")
pairs = lines.map(lambda s: (s, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)  #统计该文件每行的值出现几次,大概有重复的行
counts.collect()

  7)常见transformation和action。列出常用操作,知道都能实现哪些功能。

map(func) 经过func映射后,返回新的分布式数据集
filter(func) 返回新的数据集,由func为True时的元素的组成。过滤
flatMap(func) 类map,但每个输入项可映射到0或多个输出项,故func返回的是个序列
mapPartitions(func) 类map,在RDD的每个分区上分别执行,那么func的类型必须是迭代器Iterator<T> => Iterator<U>
mapPartitionsWithIndex(func) func提供整型值来表示分区的index,func的类型(Int, Iterator<T>) => Iterator<U>
sample(withReplacementfractionseed) 采样数据的fraction部分,可替换可不替换,随机数种子
union(otherDataset) 返回新的数据集,包括源数据和其他数据的元素,联合
intersection(otherDataset) 插入
distinct([numTasks])) 去重
groupByKey([numTasks]) 分组,note:若分组后要聚合,那么直接使用reduceByKey()或aggregateByKey()效率更高。任务数可选
reduceByKey(func, [numTasks]) 聚合
aggregateByKey(zeroValue)(seqOpcombOp, [numTasks]) 聚合
sortByKey([ascending], [numTasks]) 排序
join(otherDataset, [numTasks]) 连接, (K, V) and (K, W)->(K,(V,W))。外连接leftOuterJoinrightOuterJoin, and fullOuterJoin
cogroup(otherDataset, [numTasks])  (K, V) and (K, W)->(K, (Iterable<V>, Iterable<W>)) tuples
cartesian(otherDataset) 用于T和U类型RDD时,返回(T, U)对(类型键值对RDD)。笛卡尔的(笛卡尔乘积?)
pipe(command[envVars]) 通过shell命令管道处理每个RDD分片
coalesce(numPartitions)  
repartition(numPartitions)  
repartitionAndSortWithinPartitions(partitioner)  

 

  

  

 

以上是关于spark知识点_RDD的主要内容,如果未能解决你的问题,请参考以下文章

Spark系列

Spark大型电商项目实战-及其改良 比对sparkSQL和纯RDD实现的结果

大数据技术之_19_Spark学习_02_Spark Core 应用解析+ RDD 概念 + RDD 编程 + 键值对 RDD + 数据读取与保存主要方式 + RDD 编程进阶 + Spark Cor

Spark——RDD算子

Spark Rdd DataFrame操作汇总

spark基础知识三