Spark-scalaI-API
Posted 指尖上的艺术
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark-scalaI-API相关的知识,希望对你有一定的参考价值。
1、sc.version
2、集群对象:SparkContext;获得Spark集群的SparkContext对象,是构造Spark应用的第一步!
SparkContext对象代表 整个 Spark集群,是Spark框架 功能的入口 ,可以用来在集群中创建RDD、累加器变量和广播变量。
SparkContext对象创建时可以指明连接到哪个集群管理器上,在Spark-Shell启动时,默认 连接到本地的集群管理器。
使用SparkContext对象(在Shell里,就是sc变量)的master方法,可以查看当前连接的集群管理器:sc.master
3、分布数据集:RDD;使用SparkContext对象创建RDD数据集,然后,才能干点有意义的事情!
Spark的核心抽象是一个分布式数据集,被称为弹性分布数据集(RDD) ,代表一个不可变的、可分区、可被并行处理 的成员集合。
RDD对象需要利用SparkContext对象的方法创建,Spark支持从多种来源创建RDD对象,比如:从本地文本文件创建、从Hadoop 的HDFS文件创建、或者通过对其他RDD进行变换获得新的RDD。
下面的示例使用本地Spark目录下的README.md文件创建一个新的RDD:
scala> val textFile = sc.textFile("README.md")
textFile: spark.RDD[String] = [email protected]
我们看到,执行的结果是,返回了一个Spark.RDD类型的变量textFile,RDD是一个模板类,方括号里的String代表 这个RDD对象成员的类型。由于是一个对象,因此值用地址表示:[email protected] 。
SparkContext对象的textFile方法创建的RDD中,一个成员对应原始文件的一行。我们看到在执行的结果中可以看到返回一个 RDD,成员类型为String,我们将这个对象保存在变量textFile中。
使用README.md文件,创建一个RDD,保存到变量 textFile中。
4、操作数据集:RDD可以执行两种操作:变换与动作
RDD的内部实现了分布计算的功能,我们在RDD上执行的操作,是透明地在整个集群上执行的。也就是说,当RDD建立 后,这个RDD就不属于本地了,它在整个集群中有效。当在RDD上执行一个操作,RDD内部需要和集群管理器进行沟通协商。
对一个RDD可以进行两种操作:动作(action)和变换(transformation)。动作总是从集群中取回数据,变换总是获得一个新的RDD,这是两种操作的字面上的差异。
事实上,当在RDD上执行一个变换时,RDD仅仅记录要做的变换,只有当RDD上需要执行一个动作时,RDD才 通过集群管理器启动实质分布计算。
这有点像拍电影,变换操作只是剧本,只有导演喊Action的时候,真正的电影才开始制作。
5、感受动作和变换的区别;RDD操作分为两种:动作和变换,只有动作才会触发计算!
下面的例子首先做一个映射变换,然后返回新纪录的条数。map是一个变换,负责将原RDD的每个记录变换到新的RDD,count是一个动作,负责获取这个RDD的记录总数。
先执行map,你应该看到很迅速干净地返回:
scala> val rdd2=textFile.map(line=>line.length)
rdd2: org.apache.spark.rdd.RDD[Int] = MappedRDD[52] ...
再执行count,这会有些不一样:
scala> rdd2.count()
......
res10: Long = 141
.....
当执行map时,我们看到结果很快返回了。但当执行count时,我们可以看到一堆的提示信息,大概的意思就是 和调度器进行了若干沟通才把数据拉回来。
看起来确实这样,变换操作就只是写写剧本,Action才真正开始执行计算任务。
6、RDD动作:获取数据的控制权;RDD动作将数据集返回本地
对一个RDD执行动作指示集群将指定数据返回本地,返回的数据可能是一个具体的值、一个数组或一个HASH表。
让我们先执行几个动作:
scala> textFile.count() // 这个动作返回RDD中的记录数
res0: Long = 126
scala> textFile.first() // 这个动作返回RDD中的第一个记录
count是一个动作,负责获取这个RDD的记录总数。first也是一个动作,负责返回RDD中的第一条记录。
在使用Spark时,最好在脑海中明确地区隔出两个区域:本地域和集群域。RDD属于集群域,那是Spark管辖的地带;RDD的动作结果属于本地域,这是我们的地盘。
只有当RDD的数据返回本地域,我们才能进行再加工,比如打印等等。
7、RDD变换:数据的滤镜;RDD变换总是返回RDD,这让我们可以把变换串起来!
RDD变换将产生一个新的RDD。下面的例子中,我们执行一个过滤(Filter)变换,将获得一个新的RDD,由原 RDD中符合过滤条件(即:包含单词Spark)的记录成员构成:
scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: spark.RDD[String] = [email protected]
变量lineWithSpark现在是一个RDD,由变量textFile这个RDD中所有包含"Spakr"单词的行构成。
由于一个RDD变换总是返回一个新的RDD,因此我们可以将变换和动作使用链式语法串起来。下面的 例子使用了链式语法解决一个具体问题:在文件中有多少行包含单词“Spark”?
scala> textFile.filter(line => line.contains("Spark")).count()
res3: Long = 15
这等同于:
scala> val rdd1 = textFile.filter(line => line.contains("Spark"))
...
scala> rdd1.count()
res12: Long = 15
用链式语法写起来更流畅一些,不过这只是一种口味的倾向而已。
8、RDD操作组合;RDD的变换有点像PS的滤镜,有时要用好几个滤镜,才能把脸修好。
RDD的诸多动作和变换,经过组合也可以实现复杂的计算,满足相当多现实的数据计算需求。
假设我们需要找出文件中单词数量最多的行,做个map/reduce就可以了:
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Long = 15
上面语句首先使用map变换,将每一行(成员)映射为一个整数值(单词数量),这获得了一个新的RDD。然后在 这个新的RDD上执行reduce动作,找到(返回)了单词数量最多的行。
9、count :计数
使用count成员函数获得RDD对象的成员总数,返回值为长整型
10、top :前N个记录
使用top成员函数获得RDD中的前N个记录,可以指定一个排序函数进行排序比较。 如果不指定排序函数,那么使用默认的Ascii码序进行记录排序。
返回值包含前N个记录的数组,记录类型为T。
11、take:无序采样
使用take成员函数获得指定数量的记录,返回一个数组。与top不同,take在提取记录 前不进行排序,它仅仅逐分区地提取够指定数量的记录就返回结果。可以将take方法 视为对RDD对象的无序采样。
返回值包含指定数量记录的数组,记录类型为T。
12、first : 取第一个记录;使用first成员函数获得RDD中的第一个记录。
使用RDD的first方法获得第一条记录。不过,没有last方法!
13、max : 取值最大的记录
使用max成员函数获得值最大的记录,可以指定一个排序函数进行排序比较。默认使用 Ascii码序进行排序。
14、min : 取值最小的记录
使用min成员函数获得值最小的记录,可以指定一个排序函数进行排序比较。默认使用 Ascii码序进行排序。
15、reduce : 规约RDD;使用RDD的reduce方法进行聚合!
使用reduce成员函数对RDD进行规约操作,必须指定一个函数指定规约行为。
语法
def reduce(f: (T, T) => T): T
参数 f : 规约函数 , 两个参数分别代表RDD中的两个记录,返回值被RDD用来进行递归计算。
示例
下面的示例使用匿名函数,将所有的记录连接起来构成一个字符串:
scala> textFile.reduce((a,b)=>a+b)
res60:String = #Apache SparkSpake is a fast...
16、collect : 收集全部记录
使用collect成员函数获得RDD中的所有记录,返回一个数组。collect方法 可以视为对RDD对象的一个全采样。
17、map : 映射
映射变换使用一个映射函数对RDD中的每个记录进行变换,每个记录变换后的新值集合构成一个新的RDD。
语法
def map[U](f: (T) => U)(implicit arg0: ClassTag[U]): RDD[U]
参数
f : 映射函数 , 输入参数为原RDD中的一个记录,返回值构成新RDD中的一个记录。
下面的示例将textFile的每个记录(字符串)变换为其长度值,获得一个新的RDD,然后取回第一个记录查看:
scala> textFile.map(line=>line.length).first()
res13:Int = 14
18、filter : 过滤
过滤变换使用一个筛选函数对RDD中的每个记录进行筛选,只有筛选函数返回真值的记录,才 被选中用来构造新的RDD。
语法
def filter(f: (T) => Boolean): RDD[T]
参数
f : 筛选函数,输入参数为原RDD中的一个元素,返回值为True或False 。
下面的示例仅保留原RDD中字符数多于20个的记录(行),获得一个新的RDD,然后取回第一个 记录查看:
scala> textFile.filter(line=>line.length>20).first()
res20: String = Spark is a fast and generic .
19、sample : 采样;使用RDD的sample方法获得一个采样RDD!
采样变换根据给定的随机种子,从RDD中随机地按指定比例选一部分记录,创建新的RDD。采样变换 在机器学习中可用于进行交叉验证。
语法
def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]
参数
withReplacement : Boolean , True表示进行替换采样,False表示进行非替换采样
fraction : Double, 在0~1之间的一个浮点值,表示要采样的记录在全体记录中的比例
seed :随机种子
示例
下面的示例从原RDD中随机选择20%的记录,构造一个新的RDD,然后返回新RDD的记录数:
scala> textFile.sample(true,0.2).count()
res12: Long = 26
20、union : 合并;使用RDD的union方法,可以获得两个RDD的并集!
合并变换将两个RDD合并为一个新的RDD,重复的记录不会被剔除。
语法
def union(other: RDD[T]): RDD[T]
参数
other : 第二个RDD
示例
下面的示例,首先对textFile这个RDD进行一个每行反转的映射变换,获得一个新的RDD,再 将这个新的RDD和原来的RDD:textFile进行合并,最后我们使用count查看一下总记录数:
scala> textFile.map(line=>line.reverse).union(textFile).count()
res13: Long = 282
可以看到,合并后的总记录数是原来的2倍。
21、intersection : 相交;使用RDD的intersection方法,可以获得两个RDD的交集!
相交变换仅取两个RDD共同的记录,构造一个新的RDD。
语法
def intersection(other: RDD[T]): RDD[T]
参数
other : 第二个RDD
示例
下面的示例将每个记录进行逆转后的RDD与原RDD相交,获得一个新的RDD,我们使用collect回收全部 数据以便显示:
scala> textFile.map(line=>line.reverse).intersection(textFile).collect()
res27: Array[String] =Array(" ","")
可以看到,只有空行被保留下来,因为空行的逆序保持不变。
22、distinct : 剔重;使用RDD的distinct方法,可以进行记录剔重!
剔重变换剔除RDD中的重复记录,返回一个新的RDD。
语法
def distinct(): RDD[T]
示例
下面的示例将RDD中重复的行剔除,并返回新RDD中的记录数:
sala> textFile.distinct().count()
res20: Long =91
以上是关于Spark-scalaI-API的主要内容,如果未能解决你的问题,请参考以下文章