Spark之Dataframe基本操作

Posted 柳小葱

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark之Dataframe基本操作相关的知识,希望对你有一定的参考价值。

💗昨天介绍了spark的大致内容,今天来介绍spark中极为重要的结构Dataframe以及一系列的操作,本文也可以叫做pyspark教程,有兴趣的同学可以查看昨天的文章👇:

💌今天我们来学习Spark的Dataframe类型,这一章节很重要,包括dataframe的增删改查,抽样和分割等操作。

1.Dataframe

1.1 Dataframe的定义

Dataframe由许多记录(record)组成,记录是ROW类型(与一个table中的行类似),一条记录由很多列组成,类可以表示在该Dataset中每个单独的记录上执行的计算表达式。模式定义了Dataframe列的名以及列的数据类型。Dataframe的分区定义了dataframe以及dataset在集群上的物理分布,而划分模式定义了partition的分配方式,你可以自定义分区的方式,也可以采用随机分配的方式。
例:在dbfs上导入数据构造一个dataframe

#json类型的文件
df=spark.read.format("json").load("/FileStore/tables/2015_summary.json")
df.show(5)

结果如下:
在这里插入图片描述

1.2 Dataframe的模式

模式定义了dataframe的列名以及列的数据类型,它可以由数据源来定义(schema-on-read),也可以由我们自己来显式定义。
例:查看dataframe的模式

spark.read.format("json").load("/FileStore/tables/2015_summary.json").schema

图中的模式是由许多structfield构成的structtype.说明了了dataframe每列的数据类型等相关信息。
在这里插入图片描述
例:创建一个dataframe并未其指定一个模式schema

#导入所需求的包
from pyspark.sql.types import StructField, StructType, StringType, LongType
#指定模式
myManualSchema = StructType([
  StructField("DEST_COUNTRY_NAME", StringType(), True),
  StructField("ORIGIN_COUNTRY_NAME", StringType(), True),
  StructField("count", LongType(), False, metadata{"hello":"world"})
])
#导入数据并指定模式
df = spark.read.format("json").schema(myManualSchema).load("/data/flight-data/json/2015-summary.json")

2.列

2.1 引用列

取某列用col(列名)或者column(列名)都可以

#col和column都可以
from pyspark.sql.functions import col,column
col("count")
df["count"]

结果如下:
在这里插入图片描述

2.2 列作为表达式

表达式: 表达式是对dataframe中某一记录的一个或者多个值的组转换。可以把它想象成一个函数,它将一个或者多个列作为输入,然后解析它们。一般通过expr函数来创建表达式。

#某列减5的操作
from pyspark.sql.functions import expr
col("somecol")-5
expr("somecol-5")

2.3查看所有列

spark.read.format("json").load("/FileStore/tables/2015_summary.json").columns

结果如下:
在这里插入图片描述

3.行和记录

行和记录是一个东西,但是我们经常使用记录。

3.1 查看第一行

df.first()

结果如下:
在这里插入图片描述

3.2 创建记录

使用Row函数来创建记录

from pyspark.sql import Row
myrow=Row("Hello",None,1,False)

4.Dataframe转换操作

4.1 通过Row建立dataframe

from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, StringType, LongType
#建立模式
myManualSchema = StructType([
  StructField("some", StringType(), True),
  StructField("col", StringType(), True),
  StructField("names", LongType(), False)
])
#建立记录对象
myRow = Row("Hello", None, 1)
#组合成dataframe
myDf = spark.createDataFrame([myRow], myManualSchema)
myDf.show()

创建成功
在这里插入图片描述

4.2 select和selectExpr

select和selectExpr函数在Dataframe上执行类似数据表的SQL查询。

#选取某列
df.select("DEST_COUNTRY_NAME").show(2)
df.select("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME").show(2)

结果如下:
在这里插入图片描述
使select和expr函数给列改名

#将DEST_COUNTRY_NAME改为destination
from pyspark.sql.functions import expr
df.select(expr("DEST_COUNTRY_NAME AS destination")).show(2)

结果如下:
在这里插入图片描述
select和expr函数的组合等于selectExpr

#将赋予别名
df.selectExpr("DEST_COUNTRY_NAME AS destination2").show(2)

结果如下:
在这里插入图片描述
使用selectExpr来使用函数创建新的列

#判断两列结果是否相等并输出新的一列
df.selectExpr(
  "*", # all original columns
  "(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry").show(2)

结果如下:
在这里插入图片描述
使用selectExpr和聚合函数来显示结果

df.selectExpr("avg(count)", "count(distinct(DEST_COUNTRY_NAME))").show(2)

结果如下:
在这里插入图片描述

5.转换成spark类型

有时需要给spark显示传递值,它是一个值而并非一个新列。使用lit可以将变量转换为literal变量(spark可以理解的值)我们换一种说法,即我们需要比较dataframe的某些值是否大于一个常量或者程序创建的变量时,可以使用该方法。

#增加一列one,值全为1
from pyspark.sql.functions import lit
df.select(expr("*"), lit(1).alias("One")).show(2)

结果如下:
在这里插入图片描述

6.添加列

使用WithColumn可以为dataframe增加一列

#增加一列全1的列
df.withColumn("numberOne", lit(1)).show(2)

在这里插入图片描述
可以增加表达式的列

#判断两列是否相等
df.withColumn("withinCountry", expr("ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME")).show(2)

结果如下;
在这里插入图片描述

7.重命名列

重命名列采用WithCloumnRennamed来重命名

#将DEST_COUNTRY_NAME改为dest
df.withColumnRenamed("DEST_COUNTRY_NAME", "dest").columns

结果如下:
在这里插入图片描述

8.删除列

使用drop关键字

df.drop("DEST_COUNTRY_NAME").columns

删除了DEST_COUNTRY_NAME列:
在这里插入图片描述

9.过滤行

过滤的方式有where和filter两种,但是where和SQL很像,我喜欢用where,但filter也一样。

df.filter(col('count')>10).show(3)
df.where('count>10').show(3)

结果一样:
在这里插入图片描述
过滤条件有很多and相连时,可以连写多个where

#连写多个where相当于多个条件and
df.where(col("count") < 2).where(col("ORIGIN_COUNTRY_NAME") != "Croatia").show(2)

在这里插入图片描述

10.去重

去重还是distinct关键字

df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").distinct().count()

结果如下:
在这里插入图片描述

11.随机抽样和随机分割

随机抽样采用的是sample函数,第一个参数指定是否放回抽样

seed = 5#随机种子
withReplacement = False#fales不放回;true为放回
fraction = 0.5
df.sample(withReplacement, fraction, seed).count()

随机分割使用的是randomSplit函数,划分训练集、测试集和验证集

#这里按照0.1,0.15,0.75划分数据
seed = 5
dataFrames = df.randomSplit([0.1,0.15,0.75], seed)
dataFrames[0].show()
dataFrames[1].show()
dataFrames[3].show()

12.增加行

Dataframe是不可变的,这意味着用户不能向dataframe中追加行,如果需要增加行,只有一种方式:将新的dataframe和旧的dataframe拼接起来,即union操作,但必须保证他们的关系模式是一样的。

#创建一个新的dataframe
from pyspark.sql import Row
schema = df.schema
newRows = [
  Row("liuxiaocong", "Other Country", 222),
  Row("liuxiaocong", "Other Country 3", 222)
]
parallelizedRows = spark.sparkContext.parallelize(newRows)
newDF = spark.createDataFrame(parallelizedRows, schema)
#看df中是否添加成功
df.union(newDF).where("count = 222").show()

union成功。
在这里插入图片描述

13.行排序

使用关键字orderBy来排序asc升序、desc降序

#按某列排序默认升序
df.sort("count").show(5)
df.orderBy("count").show(5)
df.orderBy(col("count")).show(5)

在这里插入图片描述

升序或者降序排序

#升序或者降序
df.orderBy(col("count").asc()).show(5)#升序
df.orderBy(col("count").desc()).show(5)#降序

在这里插入图片描述

参考资料

《Hadoop权威指南》
《大数据hadoop3.X分布式处理实战》
《Spark权威指南》
《Pyspark实战》

以上是关于Spark之Dataframe基本操作的主要内容,如果未能解决你的问题,请参考以下文章

Spark-SQL之DataFrame操作大全

SparkSql之DataFrame操作

Spark之Dataframe基本操作

Spark2 DataFrame数据框常用操作之统计指标:mean均值,variance方差,stddev标准差,corr(Pearson相关系数),skewness偏度,kurtosis峰度((代码

Spark——DataFrame与RDD互操作方式

Spark2 DataFrame数据框常用操作之cube与rollup