Spark编程基础总结

Posted Alex_MaHao

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark编程基础总结相关的知识,希望对你有一定的参考价值。

初始化Spark

// 创建spark配置
val conf = new SparkConf().setAppName(appName).setMaster(master)
// SparkContext上下文对象
new SparkContext(conf)

RDDS

Spark 核心的概念是 Resilient Distributed Dataset (RDD):一个可并行操作的有容错机制的数据集合。

有 2 种方式创建 RDDs:

  • 第一种是在你的驱动程序中并行化一个已经存在的集合;

  • 另外一种是引用一个外部存储系统的数据集,例如共享的文件系统,HDFS,HBase或其他 Hadoop 数据格式的数据源。

并行集合

并行集合 (Parallelized collections) 的创建是通过在一个已有的集合(Scala Seq )上调用 SparkContext 的 parallelize 方法 实现的。集合中的元素被复制到一个可并行操作的分布式数据集中。

val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)

我们可以调用 distData.reduce((a, b) => a + b) 将这个数组中的元素相加

并行集合一个很重要的参数是切片数(slices),表示一个数据集切分的份数。Spark 会在集群上为每一个切片运行一个任务。 你可以在集群上为每个 CPU 设置 2-4 个切片(slices)。正常情况下,Spark 会试着基于你的集群状况自动地设置切片的数 目。然而,你也可以通过 parallelize 的第二个参数手动地设置(例如: sc.parallelize(data, 10) )。

外部数据集

Spark采用textFile()方法来从文件系统中加载数据创建RDD,该方法把文件的URI作为参数,这个URI可以是本地文件系统的地址,或者是分布式文件系统HDFS的地址,或者是Amazon S3的地址等等。

val distFile = sc.textFile("data.txt")
>>> lines = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")
>>> lines = sc.textFile("/user/hadoop/word.txt")
>>> lines = sc.textFile("word.txt")

RDD 操作

转换操作

对于RDD而言,每一次转换操作都会产生不同的RDD,供给下一个“转换”使用。转换得到的RDD是惰性求值的,也就是说,整个转换过程只是记录了转换的轨迹,并不会发生真正的计算,只有遇到行动操作时,才会发生真正的计算,开始从血缘关系源头开始,进行物理的转换操作。

常见的转换操作如下

  • filter(func):筛选出满足函数func的元素,并返回一个新的数据集

  • map(func):将每个元素传递到函数func中,并将结果返回为一个新的数据集

  • flatMap(func):与map()相似,但每个输入元素都可以映射到0或多个输出结果

  • groupByKey():应用于(K,V)键值对的数据集时,返回一个新的(K, Iterable)形式的数据集

  • reduceByKey(func):应用于(K,V)键值对的数据集时,返回一个新的(K, V)形式的数据集,其中的每个值是将每个key传递到函数func中进行聚合

行动操作

行动操作是真正触发计算的地方。Spark程序执行到行动操作时,才会执行真正的计算,从文件中加载数据,完成一次又一次转换操作,最终,完成行动操作得到结果。

  • count() 返回数据集中的元素个数

  • collect() 以数组的形式返回数据集中的所有元素

  • first() 返回数据集中的第一个元素

  • take(n) 以数组的形式返回数据集中的前n个元素

  • reduce(func) 通过函数func(输入两个参数并返回一个值)聚合数据集中的元素

  • foreach(func) 将数据集中的每个元素传递到函数func中运行

惰性机制

>>> lines = sc.textFile("data.txt")
>>> lineLengths = lines.map(lambda s : len(s))
>>> totalLength = lineLengths.reduce( lambda a, b : a + b)

上面第一行首先从外部文件data.txt中构建得到一个RDD,名称为lines,但是,由于textFile()方法只是一个转换操作,因此,这行代码执行后,不会立即把data.txt文件加载到内存中,这时的lines只是一个指向这个文件的指针。

第二行代码用来计算每行的长度(即每行包含多少个单词),同样,由于map()方法只是一个转换操作,这行代码执行后,不会立即计算每行的长度。

第三行代码的reduce()方法是一个“动作”类型的操作,这时,就会触发真正的计算。这时,Spark会把计算分解成多个任务在不同的机器上执行,每台机器运行位于属于它自己的map和reduce,最后把结果返回给Driver Program。

持久化

>>> list = ["Hadoop","Spark","Hive"]
>>> rdd = sc.parallelize(list)
>>> print(rdd.count()) //行动操作,触发一次真正从头到尾的计算
3
>>> print(','.join(rdd.collect())) //行动操作,触发一次真正从头到尾的计算
Hadoop,Spark,Hive

如上代码中,每次都会重新读取list,生成新的rdd,但rdd其只需要序列化一次即可,那么可以通过persist()方法对一个RDD标记为持久化。出现persist()语句的地方,并不会马上计算生成RDD并把它持久化,而是要等到遇到第一个行动操作触发真正计算以后,才会把计算结果进行持久化,持久化后的RDD将会被保留在计算节点的内存中被后面的行动操作重复使用。

persist()中可传入持久化参数,具体如下:

  • MEMORY_ONLY :存储于JVM中,如果内存不足,就要按照LRU原则替换缓存中的内容

  • MEMORY_AND_DISK:将RDD作为反序列化的对象存储在JVM中,如果内存不足,超出的分区将会被存放在硬盘上

cache()默认等于persist(MEMORY_ONLY)

例子如下:

>>> list = ["Hadoop","Spark","Hive"]
>>> rdd = sc.parallelize(list)
>>> rdd.cache()  //会调用persist(MEMORY_ONLY),但是,语句执行到这里,并不会缓存rdd,这是rdd还没有被计算生成
>>> print(rdd.count()) //第一次行动操作,触发一次真正从头到尾的计算,这时才会执行上面的rdd.cache(),把这个rdd放到缓存中
3
>>> print(','.join(rdd.collect())) //第二次行动操作,不需要触发从头到尾的计算,只需要重复使用上面缓存中的rdd
Hadoop,Spark,Hive

RDD操作符

键值对操作符

>>> list = ["Hadoop","Spark","Hive","Spark"]
>>> rdd = sc.parallelize(list)
>>> pairRDD = rdd.map(lambda word : (word,1))
>>> pairRDD.foreach(print)
(Hadoop,1)
(Spark,1)
(Hive,1)
(Spark,1)

reduceByKey(func)

合并具有相同键的值,reduceByKey((a,b) => a+b),有四个键值对(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5),对具有相同key的键值对进行合并后的结果就是:(“spark”,3)、(“hadoop”,8)

groupByKey()

对具有相同键的值进行分组。比如,对四个键值对(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5),采用groupByKey()后得到的结果是:(“spark”,(1,2))和(“hadoop”,(3,5))。

keys()

把键值对RDD中的key返回形成一个新的RDD。比如,对四个键值对(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5)构成的RDD,采用keys()后得到的结果是一个RDD[Int],内容是“spark”,”spark”,”hadoop”,”hadoop”。

values()

把键值对RDD中的value返回形成一个新的RDD。比如,对四个键值对(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5)构成的RDD,采用values()后得到的结果是一个RDD[Int],内容是1,2,3,5。

sortByKey()

sortByKey()的功能是返回一个根据键排序的RDD。

mapValues()

对键值对RDD的value部分进行处理,而不是同时对key和value进行处理。对于这种情形,比如,对四个键值对(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5)构成的pairRDD,如果执行pairRDD.mapValues(lambda x : x+1),就会得到一个新的键值对RDD,它包含下面四个键值对(“spark”,2)、(“spark”,3)、(“hadoop”,4)和(“hadoop”,6)。

join()

join(连接)操作是键值对常用的操作。“连接”(join)这个概念来自于关系数据库领域,因此,join的类型也和关系数据库中的join一样,包括内连接(join)、左外连接(leftOuterJoin)、右外连接(rightOuterJoin)等。最常用的情形是内连接,所以,join就表示内连接。
对于内连接,对于给定的两个输入数据集(K,V1)和(K,V2),只有在两个数据集中都存在的key才会被输出,最终得到一个(K,(V1,V2))类型的数据集。

比如,pairRDD1是一个键值对集合(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5),pairRDD2是一个键值对集合(“spark”,”fast”),那么,pairRDD1.join(pairRDD2)的结果就是一个新的RDD,这个新的RDD是键值对集合(“spark”,1,”fast”),(“spark”,2,”fast”)

以上是关于Spark编程基础总结的主要内容,如果未能解决你的问题,请参考以下文章

Spark编程基础总结

Spark编程基础总结

Spark系列

函数式编程的Java编码实践:利用惰性写出高性能且抽象的代码

函数式编程的Java编码实践:利用惰性写出高性能且抽象的代码

Spark惰性转换执行障碍