python pyspark rdd备忘单

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python pyspark rdd备忘单相关的知识,希望对你有一定的参考价值。

>>> from pyspark import SparkContext
>>> sc = SparkContext(master = 'local[2]')

Configurations
>>> from pyspark import SparkConf, SparkContext
>>> val conf = (SparkConf()
           .setMaster("local[2]")
           .setAppName("Edureka CheatSheet")
           .set("spark.executor.memory", "1g"))
>>> val sc = SparkContext(conf = conf)

# SparkContext Version
>>> sc.version
# Python Version
>>> sc.pythonVer
# Application Name
>>> sc.appName
# Application ID
>>> sc.applicationId 
# Master URL
>>> sc.master 
# Installed Spark Path
>>> str(sc.sparkHome) 
# Retreive Spark User Currently using SparkContext
>>> str(sc.sparkUser())
# Get default level of Parallelism
>>> sc.defaultParallelism 
# Get minimum number of Partitions
>>> sc.defaultMinPartitions

# Using Parallelized Collections
>>> rdd = sc.parallelize([('Jim',24),('Hope', 25),('Sue', 26)])
>>> rdd = sc.parallelize([('a',9),('b',7),('c',10)])
>>> num_rdd = sc.parallelize(range(1,5000))

# From other RDDs
>>> new_rdd = rdd.groupByKey()
>>> new_rdd = rdd.map(lambda x: (x,1))

# From a text File
>>> tfile_rdd = sc.textFile("/path/of_file/*.txt")

# Reading directory of Text Files
>>> tfile_rdd = sc.wholeTextFiles("/path/of_directory/")

# Maximum Value of RDD elements
>>> rdd.max()
# Minimum Value of RDD elements
>>> rdd.min()
# Mean value of RDD elements
>>> rdd.mean()
# Standard Deviation of RDD elements
>>> rdd.stdev()
# Get the Summary Statistics
Count, Mean, Stdev, Max & Min
>>> rdd.stats()

# Number of Partitions
>>> rdd.getNumPartitions()

# map
Return a new RDD by applying a function to each element of this RDD
>>> rdd = sc.parallelize(["b", "a", "c"])
>>> rdd.map(lambda x: (x, 1))
[('a', 1), ('b', 1), ('c', 1)]

# flatMap
Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results.
>>> rdd = sc.parallelize([2, 3, 4])
>>> rdd.flatMap(lambda x: range(1, x))
[1, 1, 1, 2, 2, 3]

# mapPartitions
Return a new RDD by applying a function to each partition of this RDD.
>>> rdd = sc.parallelize([1, 2, 3, 4], 2)
>>> def f(iterator): yield sum(iterator)
>>> rdd.mapPartitions(f).collect()
[3, 7]

# filter
Return a new RDD containing only the elements that satisfy a predicate.
>>> rdd = sc.parallelize([1, 2, 3, 4, 5])
>>> rdd.filter(lambda x: x % 2 == 0).collect()
[2, 4]

# distinct
Return a new RDD containing the distinct elements in this RDD.
>>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect())
[1, 2, 3]

# reduce
Reduces the elements of this RDD using the specified commutative 
and associative binary operator. Currently reduces partitions locally.
>>> from operator import add
>>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add)
15
>>> sc.parallelize((2 for _ in range(10))).map(lambda x: 1).cache().reduce(add)
10

# count
Return the number of elements in this RDD.
>>> sc.parallelize([2, 3, 4]).count()
3

# first
Return the first element in this RDD.
>>> sc.parallelize([2, 3, 4]).first()
2

# take 
Take the first "n" num elements of the RDD.
>>> sc.parallelize([2, 3, 4, 5, 6]).cache().take(2)
[2, 3]

# countByValue
Return the count of each unique value in this RDD as a 
dictionary of (value, count) pairs.
>>> sorted(sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items())
[(1, 2), (2, 3)]

# sortBy
Sorts this RDD by the given keyfunc
>>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
>>> sc.parallelize(tmp).sortBy(lambda x: x[0]).collect()
[('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]

# sortByKey
Sorts this RDD, which is assumed to consist of (key, value) pairs.
>>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
>>> sc.parallelize(tmp).sortByKey(True, 1).collect()
[('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]

# groupBy
Return an RDD of grouped items.
>>> rdd = sc.parallelize([1, 1, 2, 3, 5, 8])
>>> result = rdd.groupBy(lambda x: x % 2).collect()
>>> sorted([(x, sorted(y)) for (x, y) in result])
[(0, [2, 8]), (1, [1, 1, 3, 5])]

# groupByKey 
Group the values for each key in the RDD into a single sequence.
>>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> map((lambda (x,y): (x, list(y))), sorted(x.groupByKey().collect()))
[('a', [1, 1]), ('b', [1])

# fold
Aggregate the elements of each partition, and then the results for 
all the partitions, using a given associative function and a neutral "zero value."
>>> from operator import add
>>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add)
15

# _add_ 
Return the union of this RDD and another one.
>>> rdd = sc.parallelize([1, 1, 2, 3])
>>> (rdd + rdd).collect()
[1, 1, 2, 3, 1, 1, 2, 3]

# subtract
Return each value in self that is not contained in other.
>>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)])
>>> y = sc.parallelize([("a", 3), ("c", None)])
>>> sorted(x.subtract(y).collect())
[('a', 1), ('b', 4), ('b', 5)]

# unioin
Return the union of this RDD and another one.
>>> rdd = sc.parallelize([1, 1, 2, 3])
>>> rdd.union(rdd).collect()
[1, 1, 2, 3, 1, 1, 2, 3]

# intersection
Return the intersection of this RDD and another one
>>> rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5])
>>> rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8])
>>> rdd1.intersection(rdd2).collect()
[1, 2, 3]

# cartesian
Return the Cartesian product of this RDD and another one.
>>> rdd = sc.parallelize([1, 2])
>>> sorted(rdd.cartesian(rdd).collect())
[(1, 1), (1, 2), (2, 1), (2, 2)]

# saveAsTextFile
Save this RDD as a text file, using string representations of elements.
>>> rdd.saveAsTextFile("rdd.txt")

# saveAsHadoopFile
Output a Python RDD of key-value pairs (of form RDD[(K, V)]) to any Hadoop file system
>>> rdd.saveAsHadoopFile("hdfs://namenodehost/parent_folder/child_folder",'org.apache.hadoop.mapred.TextOutputFormat')

# saveAsPickleFile 
Save this RDD as a SequenceFile of serialized objects
>>> tmpFile = NamedTemporaryFile(delete=True)
>>> tmpFile.close()
>>> sc.parallelize([1, 2, 'spark', 'rdd']).saveAsPickleFile(tmpFile.name, 3)
>>> sorted(sc.pickleFile(tmpFile.name, 5).collect())
[1, 2, 'rdd', 'spark']

以上是关于python pyspark rdd备忘单的主要内容,如果未能解决你的问题,请参考以下文章

如何从单元素集到双元素集对 rdd-pyspark 中的元素进行分组

在PySpark / Python RDD中过滤

python pyspark-rdd-parallelize.py

python - 如何将密集向量的RDD转换为pyspark中的DataFrame?

在 PySpark 中将 Python Dict 转换为稀疏 RDD 或 DF

如何在 PySpark 中广播 RDD?