Spark之Dataframe基本操作
Posted 柳小葱
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark之Dataframe基本操作相关的知识,希望对你有一定的参考价值。
💗昨天介绍了spark的大致内容,今天来介绍spark中极为重要的结构Dataframe以及一系列的操作,本文也可以叫做pyspark教程,有兴趣的同学可以查看昨天的文章👇:
- Spark浅析: Hadoop之spark浅析.
💌今天我们来学习Spark的Dataframe类型,这一章节很重要,包括dataframe的增删改查,抽样和分割等操作。
这里写目录标题
1.Dataframe
1.1 Dataframe的定义
Dataframe由许多记录(record)组成,记录是ROW类型(与一个table中的行类似),一条记录由很多列组成,类可以表示在该Dataset中每个单独的记录上执行的计算表达式。模式定义了Dataframe列的名以及列的数据类型。Dataframe的分区定义了dataframe以及dataset在集群上的物理分布,而划分模式定义了partition的分配方式,你可以自定义分区的方式,也可以采用随机分配的方式。
例:在dbfs上导入数据构造一个dataframe
#json类型的文件
df=spark.read.format("json").load("/FileStore/tables/2015_summary.json")
df.show(5)
结果如下:
1.2 Dataframe的模式
模式定义了dataframe的列名以及列的数据类型,它可以由数据源来定义(schema-on-read),也可以由我们自己来显式定义。
例:查看dataframe的模式
spark.read.format("json").load("/FileStore/tables/2015_summary.json").schema
图中的模式是由许多structfield构成的structtype.说明了了dataframe每列的数据类型等相关信息。
例:创建一个dataframe并未其指定一个模式schema
#导入所需求的包
from pyspark.sql.types import StructField, StructType, StringType, LongType
#指定模式
myManualSchema = StructType([
StructField("DEST_COUNTRY_NAME", StringType(), True),
StructField("ORIGIN_COUNTRY_NAME", StringType(), True),
StructField("count", LongType(), False, metadata{"hello":"world"})
])
#导入数据并指定模式
df = spark.read.format("json").schema(myManualSchema).load("/data/flight-data/json/2015-summary.json")
2.列
2.1 引用列
取某列用col(列名)或者column(列名)都可以
#col和column都可以
from pyspark.sql.functions import col,column
col("count")
df["count"]
结果如下:
2.2 列作为表达式
表达式: 表达式是对dataframe中某一记录的一个或者多个值的组转换。可以把它想象成一个函数,它将一个或者多个列作为输入,然后解析它们。一般通过expr函数来创建表达式。
#某列减5的操作
from pyspark.sql.functions import expr
col("somecol")-5
expr("somecol-5")
2.3查看所有列
spark.read.format("json").load("/FileStore/tables/2015_summary.json").columns
结果如下:
3.行和记录
行和记录是一个东西,但是我们经常使用记录。
3.1 查看第一行
df.first()
结果如下:
3.2 创建记录
使用Row函数来创建记录
from pyspark.sql import Row
myrow=Row("Hello",None,1,False)
4.Dataframe转换操作
4.1 通过Row建立dataframe
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, StringType, LongType
#建立模式
myManualSchema = StructType([
StructField("some", StringType(), True),
StructField("col", StringType(), True),
StructField("names", LongType(), False)
])
#建立记录对象
myRow = Row("Hello", None, 1)
#组合成dataframe
myDf = spark.createDataFrame([myRow], myManualSchema)
myDf.show()
创建成功
4.2 select和selectExpr
select和selectExpr函数在Dataframe上执行类似数据表的SQL查询。
#选取某列
df.select("DEST_COUNTRY_NAME").show(2)
df.select("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME").show(2)
结果如下:
使select和expr函数给列改名
#将DEST_COUNTRY_NAME改为destination
from pyspark.sql.functions import expr
df.select(expr("DEST_COUNTRY_NAME AS destination")).show(2)
结果如下:
select和expr函数的组合等于selectExpr
#将赋予别名
df.selectExpr("DEST_COUNTRY_NAME AS destination2").show(2)
结果如下:
使用selectExpr来使用函数创建新的列
#判断两列结果是否相等并输出新的一列
df.selectExpr(
"*", # all original columns
"(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry").show(2)
结果如下:
使用selectExpr和聚合函数来显示结果
df.selectExpr("avg(count)", "count(distinct(DEST_COUNTRY_NAME))").show(2)
结果如下:
5.转换成spark类型
有时需要给spark显示传递值,它是一个值而并非一个新列。使用lit可以将变量转换为literal变量(spark可以理解的值)我们换一种说法,即我们需要比较dataframe的某些值是否大于一个常量或者程序创建的变量时,可以使用该方法。
#增加一列one,值全为1
from pyspark.sql.functions import lit
df.select(expr("*"), lit(1).alias("One")).show(2)
结果如下:
6.添加列
使用WithColumn可以为dataframe增加一列
#增加一列全1的列
df.withColumn("numberOne", lit(1)).show(2)
可以增加表达式的列
#判断两列是否相等
df.withColumn("withinCountry", expr("ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME")).show(2)
结果如下;
7.重命名列
重命名列采用WithCloumnRennamed来重命名
#将DEST_COUNTRY_NAME改为dest
df.withColumnRenamed("DEST_COUNTRY_NAME", "dest").columns
结果如下:
8.删除列
使用drop关键字
df.drop("DEST_COUNTRY_NAME").columns
删除了DEST_COUNTRY_NAME列:
9.过滤行
过滤的方式有where和filter两种,但是where和SQL很像,我喜欢用where,但filter也一样。
df.filter(col('count')>10).show(3)
df.where('count>10').show(3)
结果一样:
过滤条件有很多and相连时,可以连写多个where
#连写多个where相当于多个条件and
df.where(col("count") < 2).where(col("ORIGIN_COUNTRY_NAME") != "Croatia").show(2)
10.去重
去重还是distinct关键字
df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").distinct().count()
结果如下:
11.随机抽样和随机分割
随机抽样采用的是sample函数,第一个参数指定是否放回抽样
seed = 5#随机种子
withReplacement = False#fales不放回;true为放回
fraction = 0.5
df.sample(withReplacement, fraction, seed).count()
随机分割使用的是randomSplit函数,划分训练集、测试集和验证集
#这里按照0.1,0.15,0.75划分数据
seed = 5
dataFrames = df.randomSplit([0.1,0.15,0.75], seed)
dataFrames[0].show()
dataFrames[1].show()
dataFrames[3].show()
12.增加行
Dataframe是不可变的,这意味着用户不能向dataframe中追加行,如果需要增加行,只有一种方式:将新的dataframe和旧的dataframe拼接起来,即union操作,但必须保证他们的关系模式是一样的。
#创建一个新的dataframe
from pyspark.sql import Row
schema = df.schema
newRows = [
Row("liuxiaocong", "Other Country", 222),
Row("liuxiaocong", "Other Country 3", 222)
]
parallelizedRows = spark.sparkContext.parallelize(newRows)
newDF = spark.createDataFrame(parallelizedRows, schema)
#看df中是否添加成功
df.union(newDF).where("count = 222").show()
union成功。
13.行排序
使用关键字orderBy来排序asc升序、desc降序
#按某列排序默认升序
df.sort("count").show(5)
df.orderBy("count").show(5)
df.orderBy(col("count")).show(5)
升序或者降序排序
#升序或者降序
df.orderBy(col("count").asc()).show(5)#升序
df.orderBy(col("count").desc()).show(5)#降序
参考资料
《Hadoop权威指南》
《大数据hadoop3.X分布式处理实战》
《Spark权威指南》
《Pyspark实战》
以上是关于Spark之Dataframe基本操作的主要内容,如果未能解决你的问题,请参考以下文章
Spark2 DataFrame数据框常用操作之统计指标:mean均值,variance方差,stddev标准差,corr(Pearson相关系数),skewness偏度,kurtosis峰度((代码