pyspark3.0.0+spark3.0.0学习01(RDD的创建,RDD算子(常用Transformation算子 常用Action算子 分区操作算子)的学习及应用)

Posted 房东地猫

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了pyspark3.0.0+spark3.0.0学习01(RDD的创建,RDD算子(常用Transformation算子 常用Action算子 分区操作算子)的学习及应用)相关的知识,希望对你有一定的参考价值。

pip install pyspark==你的spark版本
版本一定要相同

文章目录

Spark RDD 编程的程序入口对象是SparkContext对象(不论何种编程语言)

from pyspark import SparkConf,SparkContext
conf=SparkConf().setAppName("Learnpyspark").setMaster("local[1]")
sc=SparkContext(conf=conf)
#RDD的创建
#sc对象的parallelize方法,可以将本地集合转换成RDD返回给你
data=[1,2,3,4,5,6,7,8,9]
rdd=sc.parallelize(data,numSlices=3)
print(rdd.collect())
print(rdd.getNumPartitions())#获取RDD分区数
rdd1=sc.textFile("./Learnpyspark_datafiles/words.txt")
#rdd2=sc.textFile("hdfs://servername:8020/xxx/xxx")
print(rdd1.getNumPartitions())
print(rdd1.collect())
#wholeTextFiles适合读取一堆小文件
rdd3=sc.wholeTextFiles("./Learnpyspark_datafiles/tiny_files/*")
print(rdd3.getNumPartitions())
print(rdd3.collect())

#map
rdd=sc.parallelize([1,2,3,4,5])
print(rdd.map(lambda x:x*10).collect())
[10, 20, 30, 40, 50]
#flatMap
rdd=sc.parallelize(["a b c","d e f"])
print(rdd.flatMap(lambda x:x.split(" ")).collect())
rdd=sc.parallelize(["a,b,c","d,e,f"])
print(rdd.flatMap(lambda x:x.split(",")).collect())
['a', 'b', 'c', 'd', 'e', 'f']
['a', 'b', 'c', 'd', 'e', 'f']
#reduceByKey
rdd=sc.parallelize([("a",2),("b",5),("a",4),("b",3),("c",2)])
result=rdd.reduceByKey(lambda x,y:x+y)
print(result.collect())
[('a', 6), ('b', 8), ('c', 2)]
#WordCount
fileRDD=sc.textFile("./Learnpyspark_datafiles/words.txt")
wordsRDD=fileRDD.flatMap(lambda x:x.split(" "))
print("wordsRDD:",wordsRDD.collect())
wordWithOneRDD=wordsRDD.map(lambda x:(x,1))
print("wordWithOneRDD:",wordWithOneRDD.collect())
result=wordWithOneRDD.reduceByKey(lambda x,y:x+y).collect()
print("result:",result)

#groupBy
rdd1=sc.parallelize([1,2,3,4,5])
rdd2=rdd1.groupBy(lambda num:'even' if(num %2==0) else 'odd')
#将rdd2的value转换成list,这样print可以输出内容
print(rdd2.map(lambda x:(x[0],list(x[1]))).collect())
[('odd', [1, 3, 5]), ('even', [2, 4])]
#filter
rdd=sc.parallelize([1,2,3,4,5])
#保留奇数
rdd1=rdd.filter(lambda x:True if(x%2==1)else False)
print(rdd1.collect())
[1, 3, 5]
#distinct
rdd=sc.parallelize([1,1,1,2,3,3,5,5,5,6,8,8,8])
print(rdd.distinct().collect())
[1, 2, 3, 5, 6, 8]
#union只合并不去重
rdd1=sc.parallelize([1,2,3,4,5])
rdd2=sc.parallelize([3,4,5,6,7])
union_rdd=rdd1.union(rdd2)
print(union_rdd.collect())
[1, 2, 3, 4, 5, 3, 4, 5, 6, 7]
#join
#只能用于二元数组,可实现SQL内/外连接
#部门ID和员工姓名
x=sc.parallelize([(1001,"张三"),(1002,"李四"),(1003,"王五")])
#部门ID和部门名称
y=sc.parallelize([(1001,"sales"),(1002,"tech")])
#join内连接
print("内连接:",x.join(y).collect())
print("左外连接:",x.leftOuterJoin(y).collect())
print("右外连接:",x.rightOuterJoin(y).collect())
内连接: [(1002, ('李四', 'tech')), (1001, ('张三', 'sales'))]
左外连接: [(1002, ('李四', 'tech')), (1001, ('张三', 'sales')), (1003, ('王五', None))]
右外连接: [(1002, ('李四', 'tech')), (1001, ('张三', 'sales'))]
#intersection求两个RDD的交集
rdd1=sc.parallelize([('a',1),('b',1)])
rdd2=sc.parallelize([('a',1),('c',1)])
intersection_rdd=rdd1.intersection(rdd2)
print(intersection_rdd.collect())
[('a', 1)]
rdd=sc.parallelize(data,numSlices=3)
rdd.glom().collect()
[[1, 2, 3], [4, 5, 6], [7, 8, 9]]
#groupByKey针对KV型RDD,自动按照key分组
rdd=sc.parallelize([('a',1),('a',2),('b',2),('b',4),('c',5)])
grouped_rdd=rdd.groupByKey()
print(grouped_rdd.map(lambda x:(x[0],list(x[1]))).collect())
[('a', [1, 2]), ('b', [2, 4]), ('c', [5])]
#sortBy
rdd=sc.parallelize([4,5,3,1,2])
rdd.sortBy(lambda x:x,ascending=False).collect()
# ascending=False降序,True升序
# numPartitions=用多少分区排序,全局有序,设置1或不设
'''
[5, 4, 3, 2, 1]
#sortByKey针对KV型RDD,自动按照key排序
rdd = sc.parallelize([('a', 1), ('E', 1), ('C', 1), ('D', 1), ('b', 1), ('g', 1), ('f', 1),
                          ('y', 1), ('u', 1), ('i', 1), ('o', 1), ('p', 1),
                          ('m', 1), ('n', 1), ('j', 1), ('k', 1), ('l', 1)], 3)

print(rdd.sortByKey(ascending=True, numPartitions=1, keyfunc=lambda key:str(key).lower()).collect())
# 全部转换为小写再排序,但是不改变数据
[('a', 1), ('b', 1), ('C', 1), ('D', 1), ('E', 1), ('f', 1), ('g', 1), ('i', 1), ('j', 1), ('k', 1), ('l', 1), ('m', 1), ('n', 1), ('o', 1), ('p', 1), ('u', 1), ('y', 1)]
#countByKey
rdd1=sc.textFile("./Learnpyspark_datafiles/words.txt")
rdd2=rdd1.flatMap(lambda x:x.split(" "))
rdd3=rdd2.map(lambda x:(x,1))
result=rdd3.countByKey()
print(result)
defaultdict(<class 'int'>, 'hello': 3, 'spark': 1, 'hadoop': 1, 'flink': 1)
#rdd.collect()
#数据集结果不能太大,不然会把内存撑爆
#reduce对RDD数据集按照你传入的逻辑进行聚合
rdd=sc.parallelize(range(1,10))#1--9
print(rdd.reduce(lambda a,b:a+b))
45
#fold
rdd=sc.parallelize(range(1,10),3)
print(rdd.fold(10,lambda a,b:a+b))
# 每个分区聚合时带上10
85
#first取出RDD第一个元素
rdd=sc.parallelize([1,2,3])
print(rdd.first())
1
#take取出RDD前n个元素
rdd=sc.parallelize([1,2,3])
print(rdd.take(2))
[1, 2]
#top对RDD数据集进行降序排序,取前n个
rdd=sc.parallelize([3,2,1,5,8,6,4])
rdd.top(4)
[8, 6, 5, 4]
rdd=sc.parallelize(range(1,10))
rdd.count()
9
#takeSample随机抽样RDD数据
# True,False是否重复表示同一个位置的数据
# 参数2,抽样几个;参数3随机数种子,一般不传
rdd=sc.parallelize([1,2,3,5,6,8,9],2)
print(rdd.takeSample(True,4))
[1, 6, 3, 8]
rdd=sc.parallelize([1,3,2,4,7,9,6])
print(rdd.takeOrdered(3))
# 将数字转换为负数,原本正数最大的变为最小的
print(rdd.takeOrdered(3,lambda x:-x))
[1, 2, 3]
[9, 7, 6]
#foreach和map一个意思,但是这个方法没有返回值
rdd=sc.parallelize(range(1,10),1)
result=rdd.foreach(lambda x:print(x))#rdd.foreach(print)
# rdd=sc.parallelize(range(1,10),3)
# rdd.saveAsTextFile("hdfs://servername:8020/xx/xx")
#mapPartitions
rdd = sc.parallelize([1, 3, 2, 4, 7, 9, 6], 3)
def process(iter):
    result = list()
    for it in iter:
        result.append(it * 10)
    return result
print(rdd.mapPartitions(process).collect())
[10, 30, 20, 40, 70, 90, 60]
#foreachPartition和普通foreach一样,一次处理的是一整个分区数据
rdd=sc.parallelize([1,3,2,4,7,9,6],3)
def ride10(data):
    print("--------")
    result=list()
    for i in data:
        result.append(i*10)
    print(result)
rdd.foreachPartition(ride10)
rdd=sc.parallelize([('hadoop',1),('spark',2),('hello',4),('spark',5),('pyspark',2)])
def partition_self(key):
    if 'hadoop'==key:return 0
    if('spark'==key or 'hello'==key):return 1
    return 2
print(rdd.partitionBy(3,partition_self).glom().collect())
[[('hadoop', 1)], [('spark', 2), ('hello', 4), ('spark', 5)], [('pyspark', 2)]]
# repartition对RDD分区执行重新分区(仅数量)
rdd=sc.parallelize(range(1,10))
rdd1=rdd.repartition(5)
print(rdd1.glom().collect())
rdd2=rdd.repartition(1)
print(rdd2.glom().collect())
[[], [1, 2, 3, 4, 5, 6, 7, 8, 9], [], [], []]
[[1, 2, 3, 4, 5, 6, 7, 8, 9]]
rdd=sc.parallelize(range(1,10))
print(rdd.coalesce(5,shuffle=True).glom().collect())
# 如果不写shuffle=True,那么只能减少分区,增加分区操作将不会执行
[[], [1, 2, 3, 4, 5, 6, 7, 8, 9], [], [], []]
# mapValues针对二元元组RDD,对其内部的二元元组的value执行map操作
rdd=sc.parallelize([('a',5),('a',12),('a',2),('b',6),('b',3)])
# rdd.map(lambda x:(x[0],x[1]*10))
print(rdd.mapValues(lambda x:x*10).collect())
[('a', 50), ('a', 120), ('a', 20), ('b', 60), ('b', 30)]

以上是关于pyspark3.0.0+spark3.0.0学习01(RDD的创建,RDD算子(常用Transformation算子 常用Action算子 分区操作算子)的学习及应用)的主要内容,如果未能解决你的问题,请参考以下文章

在 EMR 集群上引导 Spark 3.0.0

学习进度十三(Scala独立应用编程)

学习进度十四(Spark之Java独立应用编程)

spark 3.0.0 中的 CBORFactory NoClassDefFoundError 异常

创建 SparkSession 的 Spark 3.0.0 错误:pyspark.sql.utils.IllegalArgumentException:<exception str() fail

Window7 开发 Spark 应用