Spark RDD 操作

Posted 风老魔

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark RDD 操作相关的知识,希望对你有一定的参考价值。

1. 创建 RDD

主要两种方式:

  • sc.textFile 加载本地或集群文件系统中的数据,或者从 HDFS 文件系统、HBase、Cassandra、Amazon S3等外部数据源中加载数据集。Spark可以支持文本文件、SequenceFile文件(Hadoop提供的 SequenceFile是一个由二进制序列化过的key/value的字节流组成的文本存储文件)和其他符合Hadoop InputFormat格式的文件
  • parallelize 方法将 Driver 中数据结构化并行成 RDD
>>> lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
>>> lines = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")
>>> lines = sc.textFile("/user/hadoop/word.txt")
>>> lines = sc.textFile("word.txt")

# 并行化
nums = [1, 2, 3, 5, 6]
rdd = sc.parallelize(nums)

注意

  • 使用本地文件系统路径,须保证在所有 worker 节点上都能采用相同路径能够访问该文件(可将文件包括到每个 worker 节点上,或采用网络挂载共享文件系统)
  • textFile() 参数可以是文件、目录、压缩文件
  • textFile() 接收第二个参数(可选),用于指定分区数,默认 sparkHDFSblock 创建一个分区,(HDFS中每个block默认是128MB),可以提供一个比 block 更大的值作为分区数目,但是不能比它小

2. RDD 操作

RDD 创建后,在后续过程中会有两种操作:

  • 转换 transformation 操作:基于现有数据集创建一个新的数据集,转换得到的 RDD 是惰性求值的,也就是说,整个转换过程只是记录了转换的轨迹,并不会发生真正的计算,只有遇到行动操作时,才会发生真正的计算,开始从血缘关系源头开始,进行物理的转换操作,不会触发计算
  • 行动 action 操作:在数据集上进行运算,返回计算值,会触发计算

2.1 常用Transformation操作

2.1.1 map

将分区中的每份数据都作用到一个 function 中,生成一个新的分布式的数据集并返回,类似于 Python 内置的 map 方法:

from pyspark import SparkConf, SparkContext

if __name__ == "__main__":
    conf = SparkConf().setMaster("local[2]").setAppName("spark0401")
    sc = SparkContext(conf=conf)


    def my_app():
        """
        data 中每个元素都乘以 2
        """
        data = [1, 2, 3, 4, 5, 6]
        rdd = sc.parallelize(data).map(lambda x: x * 2)

        print(rdd.collect())


    my_app()

    sc.stop()	# 记得关闭

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-j2lIM7QC-1675174940313)(C:/Users/hj/AppData/Roaming/Typora/typora-user-images/image-20201220223223341.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-XiohZxoq-1675174940314)(C:/Users/hj/AppData/Roaming/Typora/typora-user-images/image-20201220223534810.png)]

2.1.2 filter

选出所有 function 返回值为 True 的元素,生成一个新的分布式的数据集返回:

from pyspark import SparkConf, SparkContext

if __name__ == "__main__":
    conf = SparkConf().setMaster("local[2]").setAppName("spark0401")
    sc = SparkContext(conf=conf)

    def my_filter():
        data = [1, 2, 3, 4, 5, 6]
        rdd1 = sc.parallelize(data).map(lambda x: x * 2)
        filer_rdd = rdd1.filter(lambda x: x > 4)

        print(filer_rdd.collect())

    my_filter()

    sc.stop()

2.1.3 flatMap

将函数应用于 rdd 之中的每一个元素,将返回的迭代器的所有内容构成新的 rdd,通常用来切分单词:

from pyspark import SparkConf, SparkContext

if __name__ == "__main__":
    conf = SparkConf().setMaster("local[2]").setAppName("spark0401")
    sc = SparkContext(conf=conf)

    def my_flat_map():
        data = ["hello spark", "hello python", "hello world"]
        rdd = sc.parallelize(data).flatMap(lambda line: line.split(" "))
        print(rdd.collect())

    my_flat_map()

    sc.stop()

运行结果:

['hello', 'spark', 'hello', 'python', 'hello', 'world']

2.1.4 union

连接、合并多个 rdd

from pyspark import SparkConf, SparkContext

if __name__ == "__main__":
    conf = SparkConf().setMaster("local[2]").setAppName("spark0109")
    sc = SparkContext(conf=conf)
    
    def my_union():
        """连接"""
        rdd1 = sc.parallelize([1, 2, 3])
        rdd2 = sc.parallelize(['a', 'b', 'c'])
        rdd_union = rdd1.union(rdd2)

        print(rdd_union.collect())


    my_union()
    sc.stop()

运行结果:

[1, 2, 3, 'a', 'b', 'c']

2.1.5 distinct 去重

rdd 中相同元素进行去重:

from pyspark import SparkConf, SparkContext

if __name__ == "__main__":
    conf = SparkConf().setMaster("local[2]").setAppName("spark0109")
    sc = SparkContext(conf=conf)
    
    def my_distinct():
        rdd1 = sc.parallelize([1, 2, 3])
        rdd2 = sc.parallelize([1, 'a', '2', 'b'])
        rdd_distinct = rdd1.union(rdd2).distinct()

        print(rdd_distinct.collect())


    my_distinct()
sc.stop()

运行结果:

['b', 1, 'a', 2, 3, '2']

2.1.6 join 连接

类似于 SQLjoin,包括:

  • inner join:内连接
  • outer joinleft/right/full join 外连接(左外、右外、全连接)
from pyspark import SparkConf, SparkContext

if __name__ == "__main__":
    conf = SparkConf().setMaster("local[2]").setAppName("spark0109")
    sc = SparkContext(conf=conf)
    
    def my_join():
        a = sc.parallelize([('A', 'a1'), ('C', 'c1'), ('D', 'd1'), ('F', 'f1'), ('F', 'f2')])
        b = sc.parallelize([('A', 'a2'), ('C', 'c2'), ('C', 'c3'), ('E', 'e1')])
        
        join_res = a.join(b).collect()
        left_join_res = a.leftOuterJoin(b).collect()	# 只关心左边有的数据,左边没有的为 None
        right_join_res = a.rightOuterJoin(b).collect()	# 只关心右边有的数据,右边没有的为 None
        full_join_res = a.fullOuterJoin(b).collect()

        print('a join b >>>', join_res)
        print('left_join_res >>>', left_join_res)
        print('right_join_res >>>', right_join_res)
        print('full_join_res >>>', full_join_res)


    my_join()
sc.stop()

运行结果:

a join b >>> [('A', ('a1', 'a2')), ('C', ('c1', 'c2')), ('C', ('c1', 'c3'))]

left_join_res >>> [('A', ('a1', 'a2')), ('F', ('f1', None)), ('F', ('f2', None)), ('C', ('c1', 'c2')), ('C', ('c1', 'c3')), ('D', ('d1', None))]

right_join_res >>> [('A', ('a1', 'a2')), ('C', ('c1', 'c2')), ('C', ('c1', 'c3')), ('E', (None, 'e1'))]

full_join_res >>> [('A', ('a1', 'a2')), ('F', ('f1', None)), ('F', ('f2', None)), ('C', ('c1', 'c2')), ('C', ('c1', 'c3')), ('D', ('d1', None)), ('E', (None, 'e1'))]

其他操作

# subtract找到属于前一个rdd而不属于后一个rdd的元素
>>> a = sc.parallelize(range(10))
>>> b = sc.parallelize(range(5,15))
>>> a.subtract(b).collect()
[0, 1, 2, 3, 4]            

# 求交集 
>>> a.intersection(b).collect()
[6, 7, 8, 9, 5]          

# cartesian笛卡尔积
>>> boys = sc.parallelize(["LiLei","Tom"])
>>> girls = sc.parallelize(["HanMeiMei","Lily"])
>>> boys.cartesian(girls).collect()
[('LiLei', 'HanMeiMei'), ('LiLei', 'Lily'), ('Tom', 'HanMeiMei'), ('Tom', 'Lily')]

# 按照某种方式排序,这里从小到大排序
>>> c = sc.parallelize([(1,2,3),(3,2,2),(4,1,1)])
>>> c.sortBy(lambda x: x[2]).collect()
[(4, 1, 1), (3, 2, 2), (1, 2, 3)]

# 按照拉链方式连接两个RDD,效果类似python的zip函数
# 需要两个RDD具有相同的分区,每个分区元素数量相同
>>> rdd_name = sc.parallelize(["LiLei","Hanmeimei","Lily"])
>>> rdd_age = sc.parallelize([19,18,20])
>>> rdd_name.zip(rdd_age).collect()
[('LiLei', 19), ('Hanmeimei', 18), ('Lily', 20)]
>>> rdd_name =  sc.parallelize(["LiLei","Hanmeimei","Lily","Lucy","Ann","Dachui","RuHua"])

# 将RDD和一个从0开始的递增序列按照拉链方式连接。
>>> rdd_name.zipWithIndex().collect()
[('LiLei', 0), ('Hanmeimei', 1), ('Lily', 2), ('Lucy', 3), ('Ann', 4), ('Dachui', 5), ('RuHua', 6)]

2.2. 常用 Action 操作

常用 action 算子:

  • collect:收集获取全部元素
  • count:统计数目
  • take:取几个元素,如:take(5)
  • reduce:累计计算
  • saveAsTextFile:保存到文件系统,可以保存到本地或 HDFS
  • foreach:循环元素,对每一个元素执行某种操作,不生成新的 RDD
  • takeSample(False, 10, 0):可以随机取若干个到Driver,第一个参数设置是否放回抽样
  • first():获取第一个数据

示例:

>>> data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
>>> rdd = sc.parallelize(data)
>>> rdd.collect()
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

>>> rdd.count()
10

>>> rdd.max()
10

>>> rdd.min()
1

>>> rdd.sum()
55

>>> rdd.reduce(lambda x, y: x+y)
55

>>> rdd.foreach(lambda x: print(x))
1
2
3
4
5
6
7
8
9
10

2.2.1 排序 sortBy

topN

students = [("HanMeiMei", 16, 77), ("DaChui", 16, 66), ("Jim", 18, 77), ("LiLei", 18, 87), ("RuHua", 18, 50)]
rdd = sc.parallelize(students)
rdd.sortBy(lambda x: x[2], ascending=False)

print(rdd.take(3))

[('LiLei', 18, 87), ('HanMeiMei', 16, 77), ('DaChui', 16, 66)]

2.2.2 countByKey

Pair RDDkey 统计数量:

pairRdd = sc.parallelize([(1, 1), (1, 4), (3, 9), (2, 16)])
rdd2 = pairRdd.countByKey()
print(rdd2)     # defaultdict(<class 'int'>, 1: 2, 3: 1, 2: 1)

2.3 常用PairRDD的转换操作

PairRDD 指的是数据为长度为2 的 tuple 类似 (k,v) 结构的数据类型的 RDD,其每个数据的第一个元素被当做key,第二个元素被当做 value

2.1.4 groupByKey

将相同的 key 分组,key-value 形式:

from pyspark import SparkConf, SparkContext

if __name__ == "__main__":
    conf = SparkConf().setMaster("local[2]").setAppName("spark0401")
    sc = SparkContext(conf=conf)
    
        def my_group_by_key():
        data = [什么是 Spark RDD ?

Spark核心-RDD

Spark之RDD算子-转换算子

Spark RDD 操作实战之文件读取

Spark RDD 操作

第2天Python实战Spark大数据分析及调度-RDD编程