Spark中的DataFrame是什么?以及如何构建DataFrame?(附案例)
Posted 奇迹虎虎
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark中的DataFrame是什么?以及如何构建DataFrame?(附案例)相关的知识,希望对你有一定的参考价值。
1、Spark中的DataFrame是什么?
官方解释:
DataFrame = RDD[Person] - 泛型 + Schema + SQL操作 + 优化
官方原文:A DataFrame is a DataSet organized into named columns.
中文翻译:以列(列名,列类型,列值)的形式构成的分布式的数据集。
用大白话讲:
在 Spark 中,DataFrame 是一种以 RDD 为基础的分布式数据集,是一种特殊的RDD,是一个分布式的表,类似于传统数据库中的二维表格。DataFrame 与 RDD 的主要区别在于,前者带有 schema 元信息,即 DataFrame 所表示的二维表数据集的每一列都带有名称和类型。
2、DataFrame中的Schema是什么?
解释:其实就是结构表的列名与列类型。
Schema 的两种定义方式:
- 使用 StructType 定义,是一个样例类,属性为 StructField 的数组
- 使用 StructField 定义,同样是一个样例类,有四个属性,其中字段名称和类型为必填
from pyspark.sql.types import *
# 构建 Schema 对象
schema=StructType([
StructField('name',StringType()),
StructField('age',IntegerType())
])
3、DataFrame中的Row是什么?
解释:DataFrame中每条数据封装在Row中,Row表示每行数据。
如何构建Row对象?
from pyspark.sql import Row
# 构建 Row 对象
Row(value1, value2, value3, ...)
4、构建DataFrame的几种方式:
4.1 通过 RDD 转换 DataFrame
4.1.1 通过 Row 构建 DataFrame
- 这种方法为使用反射方法Schema模式,Spark SQL 可以将 Row 对象的 RDD 转换为 DataFrame,从而推断数据类型。
from pyspark.sql import SparkSession
from pyspark import Row
if __name__ == '__main__':
# 创建上下文对象
spark = SparkSession.builder.appName('test').master('local[*]').getOrCreate()
sc = spark.sparkContext
# 创建RDD
rdd1 = sc.parallelize(['张三,30','李四,20','王五,50'])
# 将RDD的每个元素从String转成Row
rdd2 = rdd1.map(lambda x:Row(name=x.split(',')[0],age=int(x.split(',')[1])))
# 直接通过RDD的Row创建DataFrame
df = spark.createDataFrame(rdd2)
# 打印DataFrame
df.printSchema()
df.show()
# 关闭退出
spark.stop()
4.1.2 通过 StructedType 构建 DataFrame
- 从原始 RDD 创建元组或列表的 RDD。
- StructType 在步骤 1 中创建的 RDD 中创建由匹配的元组或列表结构表示的模式。
- 通过 createDataFrame 提供的方法将模式应用到 RDD SparkSession。
from pyspark.sql import SparkSession
from pyspark.sql.types import *
if __name__ == '__main__':
# 创建上下文对象
spark = SparkSession.builder.appName('test').master('local[*]').getOrCreate()
sc = spark.sparkContext
# 创建RDD
rdd1 = sc.parallelize(['张三,30','李四,20','王五,50'])
# 将RDD的每个元素从String转成Tuple
rdd2 = rdd1.map(lambda x:(x.split(',')[0],int(x.split(',')[1])))
# 为上述tuple量身定义schema
schema = StructType([
StructField('name',StringType()),
StructField('age',IntegerType())
])
# 通过RDD和Schema创建DataFrame
df = spark.createDataFrame(rdd2,schema)
# 打印DataFrame
df.printSchema()
df.show()
# 关闭退出
spark.stop()
4.1.3 通过 toDF 构建 DataFrame
from pyspark.sql import SparkSession
if __name__ == '__main__':
# 创建上下文对象
spark = SparkSession.builder.appName('test').master('local[*]').getOrCreate()
sc = spark.sparkContext
# 创建RDD
rdd1 = sc.parallelize(['张三,30','李四,20','王五,50'])
# 将RDD的每个元素从String转成Tuple
rdd2 = rdd1.map(lambda x:(x.split(',')[0],int(x.split(',')[1])))
# 调用toDF传输字段名称,直接创建DataFrame
df = rdd2.toDF(['name','age'])
# 打印DataFrame
df.printSchema()
df.show()
# 关闭退出
spark.stop()
4.1.4 通过 Pandas 构建 DataFrame
from pyspark.sql import SparkSession
import pandas as pd
from datetime import *
if __name__ == '__main__':
# 创建上下文对象
spark=SparkSession.builder.appName('test').master('local[*]').getOrCreate()
pdf=pd.DataFrame(
'a': [1, 2, 3],
'b': [2.9, 3.9, 4.9],
'c': ['string1', 'string2', 'string3'],
'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],
'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]
)
print(pdf)
# spark.createDataFrame(pd.DataFrame)
df=spark.createDataFrame(pdf)
# 打印df的schema信息
df.printSchema()
# 打印df的行数据
df.show()
# 关闭退出
spark.stop()
4.2 读取外部数据 转化为 DataFrame
4.2.1 读取 Json 文件创建 DataFrame
from pyspark.sql import SparkSession
if __name__ == '__main__':
# 创建SparkSession入口
spark=SparkSession.builder.appName('test').master('local[*]').getOrCreate()
# spark.read读取json文件,并打印schema,和数据
df1=spark.read.json('file:///root/test.json')
df1.printSchema()
df1.show()
4.2.2 读取 parquet 列式存储格式文件创建 DataFrame
from pyspark.sql import SparkSession
if __name__ == '__main__':
# 创建SparkSession入口
spark=SparkSession.builder.appName('test').master('local[*]').getOrCreate()
# spark.read读取parquet文件,并打印schema,和数据
df2=spark.read.parquet('file:///root/test.parquet')
df2.printSchema()
df2.show()
4.2.3 读取 csv 文件创建 DataFrame
from pyspark.sql import SparkSession
if __name__ == '__main__':
# 1-创建SparkSession入口
spark=SparkSession.builder.appName('test').master('local[*]').getOrCreate()
# 2-spark.read读取csv文件,并打印schema,和数据
df3=spark.read.option('sep',';').option('header',True).option('inferSchema',True).csv('file:///root/test.csv')
df3.printSchema()
df3.show()
spark.stop()
4.3 加载文件时,什么时候用textFile?什么时候用read?
- 如果加载的数据结构化程度不高,则用 textFile 返回 RDD 再处理
from pyspark.sql import SparkSession if __name__ == '__main__': spark = SparkSession.builder.appName('test').master('local[*]').getOrCreate() sc = spark.sparkContext # 读取文件生成 RDD rdd1 = sc.textFile('file:///root/1.txt')
- 如果加载的数据结构化程度很高,比如 mysql 或 半结构化数据 json、csv,则用 read 返回 DataFrame 再处理
from pyspark.sql import SparkSession if __name__ == '__main__': spark = SparkSession.builder.appName('test').master('local[*]').getOrCreate() sc = spark.sparkContext # 读取文件生成 DataFrame(特殊RDD) rdd1 = spark.read.text('file:///root/1.txt')
以上是关于Spark中的DataFrame是什么?以及如何构建DataFrame?(附案例)的主要内容,如果未能解决你的问题,请参考以下文章