PySpark DataFrame基础操作

Posted CodeKangWorld

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了PySpark DataFrame基础操作相关的知识,希望对你有一定的参考价值。

DataFrame基础操作

1、select()

  1. select函数选择DataFrame的一列或者多列,返回新的DataFrame

import pysparkfrom pyspark.sql import SparkSessionspark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()data = [("James","Smith","USA","CA"), ("Michael","Rose","USA","NY"), ("Robert","Williams","USA","CA"), ("Maria","Jones","USA","FL") ]columns = ["firstname","lastname","country","state"]df = spark.createDataFrame(data = data, schema = columns)df.show(truncate=False)

  • 选择单列

df.select("firstname").show()

PySpark DataFrame基础操作(1)

  • 选择多列

df.select("firstname","lastname").show()

PySpark DataFrame基础操作(1)

  • 嵌套列的选择

data = [ (("James",None,"Smith"),"OH","M"), (("Anna","Rose",""),"NY","F"), (("Julia","","Williams"),"OH","F"), (("Maria","Anne","Jones"),"NY","M"), (("Jen","Mary","Brown"),"NY","M"), (("Mike","Mary","Williams"),"OH","M") ]from pyspark.sql.types import StructType,StructField, StringType schema = StructType([ StructField('name', StructType([ StructField('firstname', StringType(), True), StructField('middlename', StringType(), True), StructField('lastname', StringType(), True) ])), StructField('state', StringType(), True), StructField('gender', StringType(), True) ])df2 = spark.createDataFrame(data = data, schema = schema)df2.printSchema()df2.show(truncate=False) # shows all columns

PySpark DataFrame基础操作(1)

指定嵌套列元素

df2.select("name.firstname","name.lastname").show(truncate=False)
PySpark DataFrame基础操作(1)

访问嵌套列所有元素

df2.select("name.*").show(truncate=False)
PySpark DataFrame基础操作(1)

2、collect()

collect将收集DataFrame的所有元素,因此,此操作需要在较小的数据集上操作,如果DataFrame很大,使用collect可能会造成内存溢出。

df2.collect()

PySpark DataFrame基础操作(1)

3、withColumn()

withColumn函数可以更新或者给DataFrame添加新的列,并返回新的DataFrame。

data = [('James','','Smith','1991-04-01','M',3000), ('Michael','Rose','','2000-05-19','M',4000), ('Robert','','Williams','1978-09-05','M',4000), ('Maria','Anne','Jones','1967-12-01','F',4000), ('Jen','Mary','Brown','1980-02-17','F',-1)]columns = ["firstname","middlename","lastname","dob","gender","salary"]df = spark.createDataFrame(data=data, schema = columns)

  • 更新列的数据类型

df2 = df.withColumn("salary",col("salary").cast("Integer"))df2.printSchema()

  • 更新列值

df3 = df.withColumn("salary",col("salary")*100)df3.printSchema()

  • 添加新的列

df4 = df.withColumn("CopiedColumn",col("salary")* -1)df4.printSchema()# lit添加常值df5 = df.withColumn("Country", lit("USA"))df5.printSchema()

  • rename列名

df.withColumnRenamed("gender","sex").show(truncate=False)
  • 删除列

df4.drop("CopiedColumn") .show(truncate=False)

4、where() & filter()

where和filter函数是相同的操作,对DataFrame的列元素进行筛选。

import pysparkfrom pyspark.sql import SparkSessionfrom pyspark.sql.types import StructType,StructField, StringType, IntegerType, ArrayTypefrom pyspark.sql.functions import col,array_containsspark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()arrayStructureData = [ (("James","","Smith"),["Java","Scala","C++"],"OH","M"), (("Anna","Rose",""),["Spark","Java","C++"],"NY","F"), (("Julia","","Williams"),["CSharp","VB"],"OH","F"), (("Maria","Anne","Jones"),["CSharp","VB"],"NY","M"), (("Jen","Mary","Brown"),["CSharp","VB"],"NY","M"), (("Mike","Mary","Williams"),["Python","VB"],"OH","M") ] arrayStructureSchema = StructType([ StructField('name', StructType([ StructField('firstname', StringType(), True), StructField('middlename', StringType(), True), StructField('lastname', StringType(), True) ])), StructField('languages', ArrayType(StringType()), True), StructField('state', StringType(), True), StructField('gender', StringType(), True) ])df = spark.createDataFrame(data = arrayStructureData, schema = arrayStructureSchema)df.printSchema()df.show(truncate=False)df.filter(df.state == "OH")  .show(truncate=False)df.filter(col("state") == "OH")  .show(truncate=False)  df.filter("gender == 'M'")  .show(truncate=False) df.filter( (df.state == "OH") & (df.gender == "M") )  .show(truncate=False) df.filter(array_contains(df.languages,"Java"))  .show(truncate=False) df.filter(df.name.lastname == "Williams")  .show(truncate=False)

5、dropDuplicates & distinct

二者用法相同,去重函数,即能对整体去重,也能按照指定列进行去重

import pysparkfrom pyspark.sql import SparkSessionfrom pyspark.sql.functions import exprspark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()data = [("James", "Sales", 3000),  ("Michael", "Sales", 4600),  ("Robert", "Sales", 4100),  ("Maria", "Finance", 3000),  ("James", "Sales", 3000),  ("Scott", "Finance", 3300),  ("Jen", "Finance", 3900),  ("Jeff", "Marketing", 3000),  ("Kumar", "Marketing", 2000),  ("Saif", "Sales", 4100)  ]columns= ["employee_name", "department", "salary"]df = spark.createDataFrame(data = data, schema = columns)df.printSchema()df.show(truncate=False)

PySpark DataFrame基础操作(1)

  • 整体去重

# 整体去重,返回新的DataFramedistinctDF = df.distinct()print("Distinct count: "+str(distinctDF.count()))distinctDF.show(truncate=False)df2 = df.dropDuplicates()print("Distinct count: "+str(df2.count()))df2.show(truncate=False)

PySpark DataFrame基础操作(1)

  • 按照指定列去重

# 对部分列元素进行去重,返回新的DataFramedropDisDF = df.dropDuplicates(["department","salary"])print("Distinct count of department salary : "+str(dropDisDF.count()))dropDisDF.show(truncate=False)

6、orderBy & sort

orderBy和sort都可以指定列排序,默认为升序

df.sort("department","state").show(truncate=False)df.sort(col("department"),col("state")).show(truncate=False)df.orderBy("department","state").show(truncate=False)df.orderBy(col("department"),col("state")).show(truncate=False)指定排序方式df.sort(df.department.asc(),df.state.asc()).show(truncate=False)df.sort(col("department").asc(),col("state").asc()).show(truncate=False)df.orderBy(col("department").asc(),col("state").asc()).show(truncate=False)df.sort(df.department.desc(),df.state.desc()).show(truncate=False)df.sort(col("department").desc(),col("state").desc()).show(truncate=False)df.orderBy(col("department").desc(),col("state").desc()).show(truncate=False)通过sql排序,先创建表视图,然后再编写sql语句进行排序df.createOrReplaceTempView("EMP")spark.sql("select employee_name,department,state,salary,age,bonus from EMP ORDER BY department asc").show(truncate=False)

7、groupBy

通常与聚合函数一起使用

import pysparkfrom pyspark.sql import SparkSessionfrom pyspark.sql.functions import col,sum,avg,maxfrom pyspark.sql.functions import sum,avg,max,min,mean,countspark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()simpleData = [("James","Sales","NY",90000,34,10000), ("Michael","Sales","NY",86000,56,20000), ("Robert","Sales","CA",81000,30,23000), ("Maria","Finance","CA",90000,24,23000), ("Raman","Finance","CA",99000,40,24000), ("Scott","Finance","NY",83000,36,19000), ("Jen","Finance","NY",79000,53,15000), ("Jeff","Marketing","CA",80000,25,18000), ("Kumar","Marketing","NY",91000,50,21000) ]schema = ["employee_name","department","state","salary","age","bonus"]df = spark.createDataFrame(data=simpleData, schema = schema)df.printSchema()df.show(truncate=False)## 普通的聚合函数 sum,count,max,min等df.groupBy("department").sum("salary").show(truncate=False)df.groupBy("department").count().show(truncate=False)df.groupBy("department","state")  .sum("salary","bonus")  .show(truncate=False)# agg可以同时聚合多列df.groupBy("department")  .agg(sum("salary").alias("sum_salary"),  avg("salary").alias("avg_salary"),  sum("bonus").alias("sum_bonus"),  max("bonus").alias("max_bonus")  )  .show(truncate=False)# 聚合的同时进行过滤操作df.groupBy("department")  .agg(sum("salary").alias("sum_salary"),  avg("salary").alias("avg_salary"),  sum("bonus").alias("sum_bonus"),  max("bonus").alias("max_bonus"))  .where(col("sum_bonus") >= 50000)  .show(truncate=False)

8、join

join类型,主要有

df = df1.join(df2,df1.key_id == df2.key_id,'inner')


以上是关于PySpark DataFrame基础操作的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark常用数据分析基础操作

PySpark:如何删除 DataFrame 中的非数字列?

在 PySpark 中对 DataFrame 进行逐行操作

PySpark|比RDD更快的DataFrame

PySpark DataFrame 上的求和操作在类型正常时给出 TypeError

Pyspark - 从 DataFrame 列的操作创建新列给出错误“列不可迭代”