创建rdd的两种方式:1)loading an external dataset
eg:lines = sc.parallelize(List("pandas","numpy"))#一次性导入整个dataset 因而不被广泛使用
2) distrbuting a collection of objects
eg: lines = sc.textFile(filename)
rdd提供两种操作:transformations and actions
1)transformation : 将老的rdd构造出一个新的rdd ,返回的是一个新的rdd
eg: pythonLines = lines.filter(lambda line:"python" in line)#python
val pythonLines = lines.filter(line => line.contains("python")) #scala
eg2: errorsRdd = inputRDD.filter(lambda x:"error" in x)
warningsRdd = inputRDD.filter(lambda x:"warning" in x)
badLinesRDD = errorsRDD.union(warningsRdd)_
2)actions : 基于rdd,计算出结果并将其返回程序或保存到存储系统 ,返回的是其他的data type
eg1: pythonLines.first()
eg2: println("Input had "+ badLinesRDD.count() + " concerning lines")
println("hereh are 10 examples:")
badLinesRDD.take(10).foreach(println)#collect()函数的功能和take()一样,但是collect只能用于小数据集,即数据集能全部存储于内存中
eg3: saveAsTextFile()#将rdd的contents保存到HDFS及其他分布式存储系统
特点:
1)Spark computes transformations and actions in a lazy fashion
eg:
pythonLines.first()#匹配到第一行后,不会读取整个file
2)若将多次使用数据可使用persist函数将数据保留在内存,保留在磁盘中也是有可能
eg:
pythonLines.persist
pythonLines.count()
pythonLines.first()
passing function to spark
python:
1) in lambda expressions
2) in top-level functions
3)in locally defined functions
Basic RDDs:
1)一个输入想要产生多个elements输出 flatMap()
eg:
lines = sc.parallelize("hello,word","hi")
words = lines.flatMap(lambd line:line.split(" "))
words.first()
ps:the different between map and flatMap
eg:
tokenize("coffee panda") = List("coffee","panda")
RDD1 = {"coffee panda","happy panda","happiest panda party"}
RDD1.map(tokenize) ={["coffee","panda"],["happy","panda"],["happiest","panda","party"]}
RDD1.flatMap(tokenize) = {"coffee","panda","happy","panda","happiest","panda","party"}
2)transformation function:
RDD.distinct() #去重
union() #并集 但 包含重复元素
intersection() #交集,表现稍差因为在找共同元素时需要重新‘洗牌’
subtract()# 不包含B集元素的A的剩余集合
Cartesian #笛卡尔组合集
eg:
RDD1={A,B,B,C}
RDD2={C,D}
RDD1.distinct()={A,B,C}
RDD2.union(RDD2)={A,B,B,C,C,D}
RDD1.intersection(RDD2)={C}
RDD1.subtract(RDD2)={A,B,B}
RDD1.cartesian(RDD2)={(A,C),(A,D),(B,C),(B,D),(B,C),(B,D),(C,C),(C,D)}
3)action function:
reduce()
eg:
sum = rdd.reduce( lambda x,y : x+y )
collect() #返回rdd的元素list
eg:
RDD1.collect()=[A,B,B,C]
fold()
eg:
aggregare()
eg:
sumCount = nums.aggregate((0,0),(lambda acc,value:(acc[0]+value,acc[1]+1),(lambda acc1,acc2:(acc1[0]+acc2[0],acc1[1]+acc2[1]))))
return sumCount[0] / float(sumCount[1])
Working With KEY/VALUE Pairs