[Pyspark]RDD常用方法总结

Posted sight-tech

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[Pyspark]RDD常用方法总结相关的知识,希望对你有一定的参考价值。

aggregate(zeroValue, seqOp, combOp)

  • 入参:

    • zeroValue表示一组初值 Tuple
    • seqOp表示在各个分区partition中进行 什么样的聚合操作,支持不同类型的聚合 Func
    • combOp表示将不同分区partition聚合后的结果再进行聚合,只能进行同类型聚合 Func
  • 返回:

    • 聚合后的结果,不是RDD,是一个python对象

下面是对一组数进行累加,并计算数据的长度的例子

    # sum, sum1, sum2 的数据类型跟zeroValue一样, 是一个tuple(int, int)
    seqOp = (lambda sum, item: (sum[0] + item, sum[1] + 1))
    combOp = (lambda sum1, sum2: (sum1[0] + sum2[0], sum1[1] + sum2[1]))
    result = sc.parallelize([1, 2, 3, 4]).aggregate((0, 0), seqOp, combOp)

    print(result) # (10, 4)

aggregateByKey(zeroValue, seqFunc, combFunc, numPartitions, partitionFunc)

基本跟aggregate类似,在相同的key下进行聚合操作

  • 入参:

    • zeroValue表示一组初值 Tuple
    • seqFunc表示在各个分区partition中进行 什么样的聚合操作,支持不同类型的聚合 Func
    • combFunc表示将不同分区partition聚合后的结果再进行聚合,只能进行同类型聚合 Func
    • numPartitions表示需要将此操作分割成多少个分区
    • partitionFunc自定义分区方法
  • 返回:

    • 聚合后的结果是一个RDD,不再是一个python对象, 需要调用collect()方法取回

下面是对一队成员的成绩进行累加,并计算成员的总分和参加科目的总数

    seqFunc = (lambda sum, item: (sum[0] + item, sum[1] + 1))
    combFunc = (lambda sum1, sum2: (sum1[0] + sum2[0], sum1[1] + sum2[1]))
    result = sc.parallelize(
        [("A", 83), ("A", 74), ("A", 91), ("A", 82),
         ("B", 69), ("B", 62), ("B", 97), ("B", 80), ("B", 60),
         ("C", 78), ("C", 73), ("C", 68)])         .aggregateByKey((0, 0), seqFunc, combFunc)

    print(result.collect()) # [(‘B‘, (368, 5)), (‘C‘, (219, 3)), (‘A‘, (330, 4))]

cache()

将RDD结果存储在内存中,以便再次利用

以下两条语句相等

    result = sc.parallelize([1, 2, 3, 4]).cache()
    result2 = sc.parallelize([1, 2, 3, 4])         .persist(storageLevel=StorageLevel.MEMORY_ONLY)

cartesian(rdd)

返回自己与传入rdd的笛卡尔积

  • 入参:

    • rdd表示一个rdd对象,可以存储不同数据类型 RDD
  • 返回:

    • 返回的结果是一个RDD
    num_rdd = sc.parallelize([1, 2])
    str_rdd = sc.parallelize([‘a‘, ‘y‘])
    result = num_rdd.cartesian(str_rdd)

    print(result.collect()) # [(1, ‘a‘), (1, ‘y‘), (2, ‘a‘), (2, ‘y‘)]

coalesce(numPartitions, shuffle)

常用于压缩任务,当分区过多时,将造成并行计算效率降低,调度器在不同分区中频繁切换,没有充分时间去完成计算任务。

  • 入参:

    • numPartitions表示需要将此操作压缩成多少个分区 Int
    • shuffle表示是否平均分给每个分区,并不是对数据进行打乱 Boolean
      (因为数据的偏斜, 也将影响并行计算的效率, 简单理解=> 木桶效应)
  • 返回:

    • 返回的结果是一个RDD

    num_rdd = sc.parallelize([i for i in range(0, 12)], 5)
    print(num_rdd.glom().collect()) # [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9, 10, 11]]

    new_rdd = num_rdd.coalesce(2, shuffle=True)
    print(new_rdd.glom().collect()) # [[0, 1, 4, 5, 6, 7], [2, 3, 8, 9, 10, 11]]

    new_rdd2 = num_rdd.coalesce(2, shuffle=False)
    print(new_rdd2.glom().collect()) # [[0, 1, 2, 3], [4, 5, 6, 7, 8, 9, 10, 11]]

cogroup(rdd, numPartitions)

将两个RDD中相同key进行合并,

  • 入参:

    • rdd表示一个rdd对象,可以存储不同数据类型 RDD
    • numPartitions表示需要将此操作压缩成多少个分区 Int
  • 返回:

    • 返回的结果是一个RDD
    x = sc.parallelize([("a", 1), ("b", 4)])
    y = sc.parallelize([("a", 2), ("y", 4)])
    z = x.cogroup(y)         .map(lambda item: (item[0], list(item[1][0]), list(item[1][1])))

    print(z.collect()) # [(‘b‘, [4], []), (‘y‘, [], [4]), (‘a‘, [1], [2])]

collect()

将数据以List取回本地
(官网)[https://spark.apache.org/docs/latest/api/python/pyspark.html]提示,建议只在任务结束时在调用collect方法,否则很容易OOM

  • 返回:
    • 返回的结果是一个List

collectAsMap()

将数据以key-value对的形式取回本地

  • 返回:
    • 返回的结果是一个Dict

combineByKey(createCombiner, mergeValue, mergeCombiners, numPartitions, partitionFunc)

基本跟aggregate类似,在相同的key下进行聚合操作, 计算过程发生在Driver端

  • 入参:
    • createCombiner表示一个处理初值的函数 Func
    • mergeValue表示在所在的Map节点上进行什么样的聚合操作,支持不同类型的聚合 Func
    • mergeCombiners表示将不同Map节点上相同key聚合后的结果再进行聚合,只能进行同类型聚合 Func
    • numPartitions表示需要将此操作分割成多少个分区
    • partitionFunc自定义分区方法
    init = (lambda val: [val])
    seqFunc = (lambda sum_list, item: sum_list + [item])
    combFunc = (lambda sum_list1, sum_list2: sum_list1 + sum_list2)
    result = sc.parallelize(
        [("A", 83), ("A", 74), ("A", 91), ("A", 82),
         ("B", 69), ("B", 62), ("B", 97), ("B", 80), ("B", 60),
         ("C", 78), ("C", 73), ("C", 68)])         .combineByKey(init, seqFunc, combFunc)

    print(result.collect()) 
    # [(‘B‘, [69, 62, 97, 80, 60]), (‘C‘, [78, 73, 68]), (‘A‘, [83, 74, 91, 82])]

count()

返回RDD内存储的数据长度(List形式)

  • 返回:
    • 返回的结果是一个Int

countApprox(timeout, confidence)

计算结果的估计数量;返回在timeout时间内完成的计算任务 的数据长度(List形式)

  • 入参:

    • timeout表示一个最大的计算时间 (毫秒) Int
    • confidence表示置信区间 Float
  • 返回:

    • 返回的结果是一个Int
    rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
    print(rdd.countApprox(100)) # 3

countByKey()

返回每个key对应的元素数量

  • 返回:
    • 返回的结果是一个Dict
    rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
    print(rdd.countByKey()) # defaultdict(<class ‘int‘>, {‘a‘: 2, ‘b‘: 1})

countByValue()

返回每个value出现的次数

  • 返回:
    • 返回的结果是一个Dict
    rdd2 = sc.parallelize([1, 2, 1, 2, 2], 2)
    print(rdd2.countByValue())  # defaultdict(<class ‘int‘>, {1: 2, 2: 3})

distinct()

遍历全部元素,并返回包含的不同元素的总数

  • 入参:

    • numPartitions表示需要将此操作分割成多少个分区
  • 返回:

    • 返回的结果是一个Int

filter(func)

遍历全部元素,筛选符合传入方法的元素

  • 入参:

    • func表示需要应用到每个元素的筛选方法
  • 返回:

    • 返回的结果是一个RDD
    rdd = sc.parallelize([1, 2, 3, 4, 5])
    rdd.filter(lambda x: x % 2 == 0)
    print(rdd.collect()) # [2, 4]

flatMap(func, preservesPartitioning)

遍历全部元素,将传入方法应用到每个元素上,并将最后结果展平(压成一个List)

  • 入参:

    • func表示需要应用到每个元素的方法
    • preservesPartitioning是否保持当前分区方式,默认重新分区
  • 返回:

    • 返回的结果是一个RDD
    rdd = sc.parallelize([2, 3, 4])
    sorted(rdd.flatMap(lambda x: range(1, x)).collect()) #[1, 1, 1, 2, 2, 3]
    sorted(rdd.flatMap(lambda x: [(x, x), (x, x)]).collect()) #[(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)]

flatMapValues(func)

遍历某个元素的元素值,将传入方法应用到每个元素值上,并将最后结果展平(压成一个List)

  • 入参:

    • func表示需要应用到每个元素值的方法
  • 返回:

    • 返回的结果是一个RDD
    x = sc.parallelize([("a", ["x", "y", "z"]), ("b", ["p", "r"])])
    x.flatMapValues(lambda val: val).collect() # [(‘a‘, ‘x‘), (‘a‘, ‘y‘), (‘a‘, ‘z‘), (‘b‘, ‘p‘), (‘b‘, ‘r‘)]

fold(zeroValue, func)

fold()与reduce()类似,接收与reduce接收的函数签名相同的函数,另外再加上一个初始值作为第一次调用的结果。(例如,加法初始值应为0,乘法初始值应为1)

  • 入参:

    • zeroValue表示一组初值 Tuple
    • func表示需要应用到每个元素上的方法 Func
  • 返回:

    • 聚合后的结果,不是RDD,是一个python对象
    x = sc.parallelize([1, 2, 3, 4, 5])
    x.fold(0, add) # 15

foldByKey(zeroValue, func, numPartitions, partitionFunc)

基本跟fold()类似,在相同的key下进行聚合操作

  • 入参:

    • zeroValue表示一组初值 Tuple
    • func表示需要应用到每个元素上的方法 Func
    • numPartitions表示需要将此操作分割成多少个分区
    • partitionFunc自定义分区方法
  • 返回:

    • 返回的结果是一个RDD
    x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
    x.foldByKey(0, add).collect() # [(‘a‘, 2), (‘b‘, 1)]

foreach(func)

用于遍历RDD中的元素,将函数func应用于每一个元素。

  • 入参:

    • func表示需要应用到每个元素的方法, 但这个方法不会在客户端执行
  • 返回:

    • 返回的结果是一个RDD
    def f(x): print(x)
    sc.parallelize([1, 2, 3, 4, 5]).foreach(f)

foreachPartition(func)

遍历某个分区下的全部元素,将函数func应用于每一个元素。

  • 入参:

    • func表示需要应用到每个元素的方法, 但这个方法不会在客户端执行
  • 返回:

    • 返回的结果是一个RDD
    def f(iterator):
      for x in iterator:
           print(x)
    sc.parallelize([1, 2, 3, 4, 5]).foreachPartition(f)

glom()

按分区对元素进行聚合, 返回一个二维列表

  • 返回:
    • 返回的结果是一个RDD
    rdd = sc.parallelize([1, 2, 3, 4], 2)
    sorted(rdd.glom().collect()) # [[1, 2], [3, 4]]

groupyBy(func, numPartitions, partitionFunc)

这个算子接收一个Func,应用函数后的返回值作为key,然后通过这个key来对里面的元素进行分组。

  • 入参:

    • func表示需要应用到每个元素上的方法 Func
    • numPartitions表示需要将此操作分割成多少个分区
    • partitionFunc自定义分区方法
  • 返回:

    • 返回的结果是一个RDD
    rdd = sc.parallelize([1, 1, 2, 3, 5, 8])
    result = rdd.groupBy(lambda x: x % 2).collect()
    sorted([(x, sorted(y)) for (x, y) in result]) # [(0, [2, 8]), (1, [1, 1, 3, 5])]

groupByKey(numPartitions, partitionFunc)

与groupBy类似,不需要再传入func

groupWith(rdd, *rdd)

cogroup的加强版,可以用于多于两个的RDD合并

  • 入参:

    • rdd表示一个rdd对象,可以存储不同数据类型 RDD
    • *rdd表示一个rdd可变列表对象,可以是多个RDD对象 RDD
  • 返回:

    • 返回的结果是一个RDD
    x = sc.parallelize([("a", 1), ("b", 4)])
    y = sc.parallelize([("a", 2), ("y", 4)])
    w = sc.parallelize([("c", 3), ("a", 6)])
    z = x.groupWith(y, w)         .map(lambda item: (item[0], list(item[1][0]), list(item[1][1])))
    print(z.collect()) # [(‘b‘, [4], []), (‘y‘, [], [4]), (‘a‘, [1], [2]), (‘c‘, [], [])]

join(rdd, numPartitions)

内连接,将两个RDD中具有相同的key时进行连接

  • 入参:

    • rdd表示一个rdd对象,可以存储不同数据类型 RDD
    • numPartitions表示需要将此操作分割成多少个分区
  • 返回:

    • 返回的结果是一个RDD
    x = sc.parallelize([("a", 1), ("b", 4)])
    y = sc.parallelize([("a", 2), ("a", 3)])
    sorted(x.join(y).collect()) # [(‘a‘, (1, 2)), (‘a‘, (1, 3))]

leftOuterJoin(other, numPartitions=None)

左外连接, 与join类似

lookup(key)

  • 入参:

    • func表示需要应用到每个元素的方法
  • 返回:

    • 返回的结果是一个RDD

map(func, preservesPartitioning)

  • 入参:

    • func表示需要应用到每个元素的方法
  • 返回:

    • 返回的结果是一个RDD
    

mapPartitions(func, preservesPartitioning)

  • 入参:

    • func表示需要应用到每个元素的方法
  • 返回:

    • 返回的结果是一个RDD
    

mapPartitionsWithIndex(func, preservesPartitioning)

  • 入参:

    • func表示需要应用到每个元素的方法
  • 返回:

    • 返回的结果是一个RDD
    

mapValues(func)

  • 入参:

    • func表示需要应用到每个元素的方法
  • 返回:

    • 返回的结果是一个RDD
    

reduceByKeyLocally(func)

  • 入参:

    • func表示需要应用到每个元素的方法
  • 返回:

    • 返回的结果是一个RDD
    

partitionBy(numPartitions, partitionFunc)

  • 入参:

    • func表示需要应用到每个元素的方法
  • 返回:

    • 返回的结果是一个RDD
    

persist(storageLevel)

  • 入参:

    • func表示需要应用到每个元素的方法
  • 返回:

    • 返回的结果是一个RDD
    

pipe(command, env=None, checkCode=False)

  • 入参:

    • func表示需要应用到每个元素的方法
  • 返回:

    • 返回的结果是一个RDD
    

reduce(func)

  • 入参:

    • func表示需要应用到每个元素的方法
  • 返回:

    • 返回的结果是一个RDD
    

reduceByKey(func, numPartitions=None, partitionFunc)

  • 入参:

    • func表示需要应用到每个元素的方法
  • 返回:

    • 返回的结果是一个RDD
    

reduceByKeyLocally(func)

  • 入参:

    • func表示需要应用到每个元素的方法
  • 返回:

    • 返回的结果是一个RDD
    

repartition(numPartitions)

    

rightOuterJoin(other, numPartitions=None)

右外连接, 与join类似

sample(withReplacement, fraction, seed=None)

    

sortBy()

    

sortByKey()

    

takeOrdered()

    

takeSample()

    

zip()

    x = sc.parallelize(range(0,5))
    y = sc.parallelize(range(1000, 1005))
    x.zip(y).collect() # [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]

zipWithIndex()

    rdd = sc.parallelize(["a", "b", "c", "d"], 3).zipWithIndex().collect()
    print(rdd.collect()) #[(‘a‘, 0), (‘b‘, 1), (‘c‘, 2), (‘d‘, 3)]

zipWithUniqueId()

    rdd = sc.parallelize(["a", "b", "c", "c", "e"], 3).zipWithUniqueId()
    print(rdd.collect())


以上是关于[Pyspark]RDD常用方法总结的主要内容,如果未能解决你的问题,请参考以下文章

pyspark3.0.0+spark3.0.0学习01(RDD的创建,RDD算子(常用Transformation算子 常用Action算子 分区操作算子)的学习及应用)

在 pyspark RDD 上显示分区

pyspark的RDD代码纪录

pyspark对应的scala代码PythonRDD类

在两个 Spark RDD(在 PySpark 中)上进行半连接的正确方法是啥?

pyspark:rdd.foreach(print)报错NameError