PySpark简介及DF数据处理操作总结

Posted 金融大数据分析应用

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了PySpark简介及DF数据处理操作总结相关的知识,希望对你有一定的参考价值。

一. PySpark 是 Spark 为 Python 开发者提供的 API。PySpark 是 Spark 为 Python 开发者提供的 API,位于 $SPARK_HOME/bin 目录,其依赖于 Py4J。随Spark 2.1.0发布的 Py4J位于 $SPARK_HOME/python/lib 目录,对应的版本是 0.10.4。

二. PySpark 包括的子模块:

1 pyspark.sql 模块

2 pyspark.streaming 模块

3 pyspark.ml 包

4 pyspark.mllib 包

三.  PySpark 提供的类:

1 pyspark.SparkConf: 提供了对一个 Spark 应用程序配置的操作方法。用于将各种Spark参数设置为键值对。


2 pyspark.SparkContext: 提供了应用与 Spark 交互的主入口点,表示应用与 Spark 集群的连接,基于这个连接,应用可以在该集群上创建 RDD 和 广播变量 (pyspark.Broadcast)


pyspark.SparkFiles: SparkFiles 只包含类方法,开发者不应创建 SparkFiles 类的实例  。


4 pyspark.RDD: 此类是为 PySpark 操作 RDD提供了基础方法 。first() 是 pyspark.RDD 类提供的方法,返回 RDD 的第一个元素。aggregate() 方法使用给定的组合函数和中性“零值,先聚合每个分区的元素,然后再聚合所有分区的结果。cache() 使用默认存储级别(MEMORY_ONLY)对此 RDD 进行持久化。collect() 返回一个列表,包含此 RDD 中所有元素 。


5 pyspark.Accumulator:一种“只允许添加”的共享变量,Spark 任务只能向其添加值。


6 pyspark.Broadcast:Spark 提供了两种共享变量 ,广播变量 和 累加器,pyspark.Broadcast 类提供了对广播变量的操作方法。


7  pyspark.Accumulator: 提供了对累加器变量的操作方法 。累加器是仅仅被相关操作累加的变量,因此可以在并行中被有效地支持。

四. PySpark 数据流转结构图:


PySpark简介及DF数据处理操作总结

五. DF数据处理操作:

1 运行时获取spark版本号

spark = SparkSession.builder.appName("PythonSQL").getOrCreate()print (spark.version)

2 获取spark配置情况

df = spark.sql("SET -v")df.show()

3 显示每列的所有内容,不删减内容显示,show每列所有内容

df.show(truncate=False)

4 Pandas和Spark的DataFrame两者互相转换

pandas_df = spark_df.toPandas()spark_df = sqlContext.createDataFrame(pandas_df)

5 与Spark RDD的相互转换

rdd_df = df.rdddf = rdd_df.toDF()

注:rdd转df前提是每个rdd的类型都是Row类型

6 新增列

from pyspark.sql import fdf = df.withColumn(“xx”, f.lit(0)).show()

7 fillna函数

df.na.fill() # 注意与scala用法不一样

8 以原有列为基础添加列

df = df.withColumn('count_add', df["count"] + 1)

9 增加序列标签

df.rdd.zipWithIndex()

10 删除一列

df.drop('age').collect()df.drop(df.age).collect()

11 dropna函数

df = df.na.drop() # 删除任何列包含na的行df = df.dropna(subset=['col_name1', 'col_name2'])

12 修改原有df[“AA”]列的所有值

df = df.withColumn(“AA”, 1)

13 强制修改列的类型

df = df.withColumn("year2", df["year1"].cast("Int"))

14 合并2个表的join方法

df_join = df_1.join(df_2, df_1.key == df_2.key, "left")

其中方法可以为:`inner`, `outer`, `left_outer`, `right_outer`, `left` etc.

15 groupBy方法整合

GD = df.groupBy(“age”) #应用单个函数(按照A列同名的进行分组,组内对B列进行均值计算来合并)df.groupBy(“A”).avg(“B”).show() #应用多个函数:from pyspark.sql import fdf.groupBy(“A”).agg(functions.avg(“B”), f.min(“B”), f.max(“B”)).show()

整合后GD类型可用的方法(均返回DF类型):

1 avg(*cols):计算每组中一列或多列的平均值
2 count() : 计算每组中一共有多少行
3 max(*cols): 计算每组中一列或多列的最大值
4 mean(*cols) :计算每组中一列或多列的平均值
5 min(*cols):计算每组中一列或多列的最小值
6 sum(*cols):计算每组中一列或多列的总和


16 将df的每一列应用函数f

df.foreach(f) |df.rdd.foreach(f)

17 将df的每一块应用函数f

df.foreachPartition(f) |df.rdd.foreachPartition(f)

18 Map和Reduce返回类型seqRDDs

df.map(func)df.reduce(func)

19 解决中文乱码问题

import sysreload(sys)sys.setdefaultencoding('utf-8')

20 行元素查询

df.show()df.show(30) # 行数,默认20

21 以树的形式打印概要

df.printSchema()

22 获取头几行

list = df.head(3) list = df.take(5)# 返回列表,元素类型为Row

23 查询总行数

df.count()

24 查询某列为null的行

from pyspark.sql.functions import isnulldf = df.filter(isnull("columx"))

25 获取Row元素的所有列名

r = Row(age=11, name='Alice')print r.__fields__ # 字段列表

26 选择一列或多列方式

df.select(“name”)df.select(df[‘name’], df[‘age’]+1)df.select(df.a, df.b, df.c) # 选择a、b、c三列df.select(df["a"], df["b"], df["c"]) # 选择a、b、c三列

27 排序

df = df.sort("age", ascending=False)

28 过滤数据(where同filter)

df = df.filter(df['age']>21)df = df.where(df['age']>21)# 对null或nan数据进行过滤from pyspark.sql.functions import isnan, isnulldf = df.filter(isnull("a")) # 把a列里面数据为null的筛选出来(代表python的None类型)df = df.filter(isnan("a")) # 把a列里面数据为nan(Not a Number,非数字数据)的筛选出来

29 DF注册成SQL的表

df.createOrReplaceTempView("view1")

30 进行SQL查询返回DF

conf = SparkConf()spark = SparkSession.builder.appName("APP_NAME").config(conf=conf).getOrCreate()df = spark.sql(“SELECT XX,... FROM table WHERE xx=xx″)

时间序列操作(Window)请关注后续专题总结。PyCharm下PySpark开发环境搭建


推荐阅读


(点击标题可跳转阅读)


2




以上是关于PySpark简介及DF数据处理操作总结的主要内容,如果未能解决你的问题,请参考以下文章

pyspark 数据帧上的向量操作

为啥 df.limit 在 Pyspark 中不断变化?

pyspark 数据框连接操作和写为 json 非常慢

PySpark sql 中一些函数的总结(持续更新)

pyspark 知识点

大型数据框上的 Pyspark groupBy