Spark编程基础总结
Posted Alex_MaHao
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark编程基础总结相关的知识,希望对你有一定的参考价值。
初始化Spark
// 创建spark配置
val conf = new SparkConf().setAppName(appName).setMaster(master)
// SparkContext上下文对象
new SparkContext(conf)
RDDS
Spark 核心的概念是 Resilient Distributed Dataset (RDD):一个可并行操作的有容错机制的数据集合。
有 2 种方式创建 RDDs:
-
第一种是在你的驱动程序中并行化一个已经存在的集合;
-
另外一种是引用一个外部存储系统的数据集,例如共享的文件系统,HDFS,HBase或其他 Hadoop 数据格式的数据源。
并行集合
并行集合 (Parallelized collections) 的创建是通过在一个已有的集合(Scala Seq )上调用 SparkContext 的 parallelize 方法 实现的。集合中的元素被复制到一个可并行操作的分布式数据集中。
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
我们可以调用 distData.reduce((a, b) => a + b) 将这个数组中的元素相加
并行集合一个很重要的参数是切片数(slices),表示一个数据集切分的份数。Spark 会在集群上为每一个切片运行一个任务。 你可以在集群上为每个 CPU 设置 2-4 个切片(slices)。正常情况下,Spark 会试着基于你的集群状况自动地设置切片的数 目。然而,你也可以通过 parallelize 的第二个参数手动地设置(例如: sc.parallelize(data, 10) )。
外部数据集
Spark采用textFile()方法来从文件系统中加载数据创建RDD,该方法把文件的URI作为参数,这个URI可以是本地文件系统的地址,或者是分布式文件系统HDFS的地址,或者是Amazon S3的地址等等。
val distFile = sc.textFile("data.txt")
>>> lines = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")
>>> lines = sc.textFile("/user/hadoop/word.txt")
>>> lines = sc.textFile("word.txt")
RDD 操作
转换操作
对于RDD而言,每一次转换操作都会产生不同的RDD,供给下一个“转换”使用。转换得到的RDD是惰性求值的,也就是说,整个转换过程只是记录了转换的轨迹,并不会发生真正的计算,只有遇到行动操作时,才会发生真正的计算,开始从血缘关系源头开始,进行物理的转换操作。
常见的转换操作如下
-
filter(func):筛选出满足函数func的元素,并返回一个新的数据集
-
map(func):将每个元素传递到函数func中,并将结果返回为一个新的数据集
-
flatMap(func):与map()相似,但每个输入元素都可以映射到0或多个输出结果
-
groupByKey():应用于(K,V)键值对的数据集时,返回一个新的(K, Iterable)形式的数据集
-
reduceByKey(func):应用于(K,V)键值对的数据集时,返回一个新的(K, V)形式的数据集,其中的每个值是将每个key传递到函数func中进行聚合
行动操作
行动操作是真正触发计算的地方。Spark程序执行到行动操作时,才会执行真正的计算,从文件中加载数据,完成一次又一次转换操作,最终,完成行动操作得到结果。
-
count() 返回数据集中的元素个数
-
collect() 以数组的形式返回数据集中的所有元素
-
first() 返回数据集中的第一个元素
-
take(n) 以数组的形式返回数据集中的前n个元素
-
reduce(func) 通过函数func(输入两个参数并返回一个值)聚合数据集中的元素
-
foreach(func) 将数据集中的每个元素传递到函数func中运行
惰性机制
>>> lines = sc.textFile("data.txt")
>>> lineLengths = lines.map(lambda s : len(s))
>>> totalLength = lineLengths.reduce( lambda a, b : a + b)
上面第一行首先从外部文件data.txt中构建得到一个RDD,名称为lines,但是,由于textFile()方法只是一个转换操作,因此,这行代码执行后,不会立即把data.txt文件加载到内存中,这时的lines只是一个指向这个文件的指针。
第二行代码用来计算每行的长度(即每行包含多少个单词),同样,由于map()方法只是一个转换操作,这行代码执行后,不会立即计算每行的长度。
第三行代码的reduce()方法是一个“动作”类型的操作,这时,就会触发真正的计算。这时,Spark会把计算分解成多个任务在不同的机器上执行,每台机器运行位于属于它自己的map和reduce,最后把结果返回给Driver Program。
持久化
>>> list = ["Hadoop","Spark","Hive"]
>>> rdd = sc.parallelize(list)
>>> print(rdd.count()) //行动操作,触发一次真正从头到尾的计算
3
>>> print(','.join(rdd.collect())) //行动操作,触发一次真正从头到尾的计算
Hadoop,Spark,Hive
如上代码中,每次都会重新读取list,生成新的rdd,但rdd其只需要序列化一次即可,那么可以通过persist()方法对一个RDD标记为持久化。出现persist()语句的地方,并不会马上计算生成RDD并把它持久化,而是要等到遇到第一个行动操作触发真正计算以后,才会把计算结果进行持久化,持久化后的RDD将会被保留在计算节点的内存中被后面的行动操作重复使用。
persist()中可传入持久化参数,具体如下:
-
MEMORY_ONLY :存储于JVM中,如果内存不足,就要按照LRU原则替换缓存中的内容
-
MEMORY_AND_DISK:将RDD作为反序列化的对象存储在JVM中,如果内存不足,超出的分区将会被存放在硬盘上
cache()默认等于persist(MEMORY_ONLY)
例子如下:
>>> list = ["Hadoop","Spark","Hive"]
>>> rdd = sc.parallelize(list)
>>> rdd.cache() //会调用persist(MEMORY_ONLY),但是,语句执行到这里,并不会缓存rdd,这是rdd还没有被计算生成
>>> print(rdd.count()) //第一次行动操作,触发一次真正从头到尾的计算,这时才会执行上面的rdd.cache(),把这个rdd放到缓存中
3
>>> print(','.join(rdd.collect())) //第二次行动操作,不需要触发从头到尾的计算,只需要重复使用上面缓存中的rdd
Hadoop,Spark,Hive
RDD操作符
键值对操作符
>>> list = ["Hadoop","Spark","Hive","Spark"]
>>> rdd = sc.parallelize(list)
>>> pairRDD = rdd.map(lambda word : (word,1))
>>> pairRDD.foreach(print)
(Hadoop,1)
(Spark,1)
(Hive,1)
(Spark,1)
reduceByKey(func)
合并具有相同键的值,reduceByKey((a,b) => a+b),有四个键值对(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5),对具有相同key的键值对进行合并后的结果就是:(“spark”,3)、(“hadoop”,8)
groupByKey()
对具有相同键的值进行分组。比如,对四个键值对(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5),采用groupByKey()后得到的结果是:(“spark”,(1,2))和(“hadoop”,(3,5))。
keys()
把键值对RDD中的key返回形成一个新的RDD。比如,对四个键值对(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5)构成的RDD,采用keys()后得到的结果是一个RDD[Int],内容是“spark”,”spark”,”hadoop”,”hadoop”。
values()
把键值对RDD中的value返回形成一个新的RDD。比如,对四个键值对(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5)构成的RDD,采用values()后得到的结果是一个RDD[Int],内容是1,2,3,5。
sortByKey()
sortByKey()的功能是返回一个根据键排序的RDD。
mapValues()
对键值对RDD的value部分进行处理,而不是同时对key和value进行处理。对于这种情形,比如,对四个键值对(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5)构成的pairRDD,如果执行pairRDD.mapValues(lambda x : x+1),就会得到一个新的键值对RDD,它包含下面四个键值对(“spark”,2)、(“spark”,3)、(“hadoop”,4)和(“hadoop”,6)。
join()
join(连接)操作是键值对常用的操作。“连接”(join)这个概念来自于关系数据库领域,因此,join的类型也和关系数据库中的join一样,包括内连接(join)、左外连接(leftOuterJoin)、右外连接(rightOuterJoin)等。最常用的情形是内连接,所以,join就表示内连接。
对于内连接,对于给定的两个输入数据集(K,V1)和(K,V2),只有在两个数据集中都存在的key才会被输出,最终得到一个(K,(V1,V2))类型的数据集。
比如,pairRDD1是一个键值对集合(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5),pairRDD2是一个键值对集合(“spark”,”fast”),那么,pairRDD1.join(pairRDD2)的结果就是一个新的RDD,这个新的RDD是键值对集合(“spark”,1,”fast”),(“spark”,2,”fast”)
以上是关于Spark编程基础总结的主要内容,如果未能解决你的问题,请参考以下文章
函数式编程的Java编码实践:利用惰性写出高性能且抽象的代码