PySpark简介及DF数据处理操作总结
Posted 金融大数据分析应用
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了PySpark简介及DF数据处理操作总结相关的知识,希望对你有一定的参考价值。
一. PySpark 是 Spark 为 Python 开发者提供的 API。PySpark 是 Spark 为 Python 开发者提供的 API,位于 $SPARK_HOME/bin 目录,其依赖于 Py4J。随Spark 2.1.0发布的 Py4J位于 $SPARK_HOME/python/lib 目录,对应的版本是 0.10.4。
二. PySpark 包括的子模块:
1 pyspark.sql 模块
2 pyspark.streaming 模块
3 pyspark.ml 包
4 pyspark.mllib 包
三. PySpark 提供的类:
1 pyspark.SparkConf: 提供了对一个 Spark 应用程序配置的操作方法。用于将各种Spark参数设置为键值对。
2 pyspark.SparkContext: 提供了应用与 Spark 交互的主入口点,表示应用与 Spark 集群的连接,基于这个连接,应用可以在该集群上创建 RDD 和 广播变量 (pyspark.Broadcast)
3 pyspark.SparkFiles: SparkFiles 只包含类方法,开发者不应创建 SparkFiles 类的实例 。
4 pyspark.RDD: 此类是为 PySpark 操作 RDD提供了基础方法 。first() 是 pyspark.RDD 类提供的方法,返回 RDD 的第一个元素。aggregate() 方法使用给定的组合函数和中性“零值,先聚合每个分区的元素,然后再聚合所有分区的结果。cache() 使用默认存储级别(MEMORY_ONLY)对此 RDD 进行持久化。collect() 返回一个列表,包含此 RDD 中所有元素 。
5 pyspark.Accumulator:一种“只允许添加”的共享变量,Spark 任务只能向其添加值。
6 pyspark.Broadcast:Spark 提供了两种共享变量 ,广播变量 和 累加器,pyspark.Broadcast 类提供了对广播变量的操作方法。
7 pyspark.Accumulator: 提供了对累加器变量的操作方法 。累加器是仅仅被相关操作累加的变量,因此可以在并行中被有效地支持。
四. PySpark 数据流转结构图:
五. DF数据处理操作:
1 运行时获取spark版本号
spark = SparkSession.builder.appName("PythonSQL").getOrCreate()
print (spark.version)
2 获取spark配置情况
df = spark.sql("SET -v")
df.show()
3 显示每列的所有内容,不删减内容显示,show每列所有内容
df.show(truncate=False)
4 Pandas和Spark的DataFrame两者互相转换
pandas_df = spark_df.toPandas()
spark_df = sqlContext.createDataFrame(pandas_df)
5 与Spark RDD的相互转换
rdd_df = df.rdd
df = rdd_df.toDF()
注:rdd转df前提是每个rdd的类型都是Row类型
6 新增列
from pyspark.sql import f
df = df.withColumn(“xx”, f.lit(0)).show()
7 fillna函数
df.na.fill() # 注意与scala用法不一样
8 以原有列为基础添加列
df = df.withColumn('count_add', df["count"] + 1)
9 增加序列标签
df.rdd.zipWithIndex()
10 删除一列
df.drop('age').collect()
df.drop(df.age).collect()
11 dropna函数
df = df.na.drop() # 删除任何列包含na的行
df = df.dropna(subset=['col_name1', 'col_name2'])
12 修改原有df[“AA”]列的所有值
df = df.withColumn(“AA”, 1)
13 强制修改列的类型
df = df.withColumn("year2", df["year1"].cast("Int"))
14 合并2个表的join方法
df_join = df_1.join(df_2, df_1.key == df_2.key, "left")
其中方法可以为:`inner`, `outer`, `left_outer`, `right_outer`, `left` etc.
15 groupBy方法整合
GD = df.groupBy(“age”)
#应用单个函数(按照A列同名的进行分组,组内对B列进行均值计算来合并)
df.groupBy(“A”).avg(“B”).show()
#应用多个函数:
from pyspark.sql import f
df.groupBy(“A”).agg(functions.avg(“B”), f.min(“B”), f.max(“B”)).show()
整合后GD类型可用的方法(均返回DF类型):
16 将df的每一列应用函数f
df.foreach(f) |
df.rdd.foreach(f)
17 将df的每一块应用函数f
df.foreachPartition(f) |
df.rdd.foreachPartition(f)
18 Map和Reduce返回类型seqRDDs
df.map(func)
df.reduce(func)
19 解决中文乱码问题
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
20 行元素查询
df.show()
df.show(30) # 行数,默认20
21 以树的形式打印概要
df.printSchema()
22 获取头几行
list = df.head(3)
list = df.take(5)
# 返回列表,元素类型为Row
23 查询总行数
df.count()
24 查询某列为null的行
from pyspark.sql.functions import isnull
df = df.filter(isnull("columx"))
25 获取Row元素的所有列名
r = Row(age=11, name='Alice')
print r.__fields__ # 字段列表
26 选择一列或多列方式
df.select(“name”)
df.select(df[‘name’], df[‘age’]+1)
df.select(df.a, df.b, df.c) # 选择a、b、c三列
df.select(df["a"], df["b"], df["c"]) # 选择a、b、c三列
27 排序
df = df.sort("age", ascending=False)
28 过滤数据(where同filter)
df = df.filter(df['age']>21)
df = df.where(df['age']>21)
# 对null或nan数据进行过滤
from pyspark.sql.functions import isnan, isnull
df = df.filter(isnull("a"))
# 把a列里面数据为null的筛选出来(代表python的None类型)
df = df.filter(isnan("a"))
# 把a列里面数据为nan(Not a Number,非数字数据)的筛选出来
29 DF注册成SQL的表
df.createOrReplaceTempView("view1")
30 进行SQL查询返回DF
conf = SparkConf()
spark = SparkSession.builder.appName("APP_NAME").config(conf=conf).getOrCreate()
df = spark.sql(“SELECT XX,... FROM table WHERE xx=xx″)
时间序列操作(Window)请关注后续专题总结。PyCharm下PySpark开发环境搭建。
推荐阅读
(点击标题可跳转阅读)
以上是关于PySpark简介及DF数据处理操作总结的主要内容,如果未能解决你的问题,请参考以下文章