Spark中的DataFrame是什么?以及如何构建DataFrame?(附案例)

Posted 奇迹虎虎

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark中的DataFrame是什么?以及如何构建DataFrame?(附案例)相关的知识,希望对你有一定的参考价值。

1、Spark中的DataFrame是什么?

官方解释:

DataFrame = RDD[Person] - 泛型 + Schema + SQL操作 + 优化

官方原文:A DataFrame is a DataSet organized into named columns.

中文翻译:以列(列名,列类型,列值)的形式构成的分布式的数据集。

用大白话讲:

在 Spark 中,DataFrame 是一种以 RDD 为基础的分布式数据集,是一种特殊的RDD,是一个分布式的表,类似于传统数据库中的二维表格。DataFrame 与 RDD 的主要区别在于,前者带有 schema 元信息,即 DataFrame 所表示的二维表数据集的每一列都带有名称和类型。


2、DataFrame中的Schema是什么?

解释:其实就是结构表的列名与列类型。

Schema 的两种定义方式:

  • 使用 StructType 定义,是一个样例类,属性为 StructField 的数组
  • 使用 StructField 定义,同样是一个样例类,有四个属性,其中字段名称和类型为必填
from pyspark.sql.types import *
# 构建 Schema 对象
schema=StructType([
    StructField('name',StringType()),
    StructField('age',IntegerType())
])

3、DataFrame中的Row是什么?

解释:DataFrame中每条数据封装在Row中,Row表示每行数据。

如何构建Row对象?

from pyspark.sql import Row
# 构建 Row 对象
Row(value1, value2, value3, ...)

4、构建DataFrame的几种方式:

4.1 通过 RDD 转换 DataFrame

下方4种方式的结果均是上图,不再逐一阐述

4.1.1 通过 Row 构建 DataFrame

  • 这种方法为使用反射方法Schema模式,Spark SQL 可以将 Row 对象的 RDD 转换为 DataFrame,从而推断数据类型。
from pyspark.sql import SparkSession
from pyspark import Row

if __name__ == '__main__':
    # 创建上下文对象
    spark = SparkSession.builder.appName('test').master('local[*]').getOrCreate()
    sc = spark.sparkContext
    # 创建RDD
    rdd1 = sc.parallelize(['张三,30','李四,20','王五,50'])
    # 将RDD的每个元素从String转成Row
    rdd2 = rdd1.map(lambda x:Row(name=x.split(',')[0],age=int(x.split(',')[1])))
    # 直接通过RDD的Row创建DataFrame
    df = spark.createDataFrame(rdd2)
    # 打印DataFrame
    df.printSchema()
    df.show()
    # 关闭退出
    spark.stop()

4.1.2 通过 StructedType 构建 DataFrame

  1. 从原始 RDD 创建元组或列表的 RDD。
  2. StructType 在步骤 1 中创建的 RDD 中创建由匹配的元组或列表结构表示的模式。
  3. 通过 createDataFrame 提供的方法将模式应用到 RDD SparkSession。
from pyspark.sql import SparkSession
from pyspark.sql.types import *

if __name__ == '__main__':
    # 创建上下文对象
    spark = SparkSession.builder.appName('test').master('local[*]').getOrCreate()
    sc = spark.sparkContext
    # 创建RDD
    rdd1 = sc.parallelize(['张三,30','李四,20','王五,50'])
    # 将RDD的每个元素从String转成Tuple
    rdd2 = rdd1.map(lambda x:(x.split(',')[0],int(x.split(',')[1])))
    # 为上述tuple量身定义schema
    schema = StructType([
        StructField('name',StringType()),
        StructField('age',IntegerType())
    ])
    # 通过RDD和Schema创建DataFrame
    df = spark.createDataFrame(rdd2,schema)
    # 打印DataFrame
    df.printSchema()
    df.show()
    # 关闭退出
    spark.stop()

4.1.3 通过 toDF 构建 DataFrame

from pyspark.sql import SparkSession

if __name__ == '__main__':
    # 创建上下文对象
    spark = SparkSession.builder.appName('test').master('local[*]').getOrCreate()
    sc = spark.sparkContext
    # 创建RDD
    rdd1 = sc.parallelize(['张三,30','李四,20','王五,50'])
    # 将RDD的每个元素从String转成Tuple
    rdd2 = rdd1.map(lambda x:(x.split(',')[0],int(x.split(',')[1])))
    # 调用toDF传输字段名称,直接创建DataFrame
    df = rdd2.toDF(['name','age'])
    # 打印DataFrame
    df.printSchema()
    df.show()
    # 关闭退出
    spark.stop()

4.1.4 通过 Pandas 构建 DataFrame

from pyspark.sql import SparkSession
import pandas as pd
from datetime import *

if __name__ == '__main__':
    # 创建上下文对象
    spark=SparkSession.builder.appName('test').master('local[*]').getOrCreate()
    pdf=pd.DataFrame(
        'a': [1, 2, 3],
        'b': [2.9, 3.9, 4.9],
        'c': ['string1', 'string2', 'string3'],
        'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],
        'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]
    )
    print(pdf)
    # spark.createDataFrame(pd.DataFrame)
    df=spark.createDataFrame(pdf)
    # 打印df的schema信息
    df.printSchema()
    # 打印df的行数据
    df.show()
    # 关闭退出
    spark.stop()

4.2 读取外部数据 转化为 DataFrame

4.2.1 读取 Json 文件创建 DataFrame

from pyspark.sql import SparkSession

if __name__ == '__main__':
    # 创建SparkSession入口
    spark=SparkSession.builder.appName('test').master('local[*]').getOrCreate()
    # spark.read读取json文件,并打印schema,和数据
    df1=spark.read.json('file:///root/test.json')
    df1.printSchema()
    df1.show()

4.2.2 读取 parquet 列式存储格式文件创建 DataFrame

from pyspark.sql import SparkSession

if __name__ == '__main__':
    # 创建SparkSession入口
    spark=SparkSession.builder.appName('test').master('local[*]').getOrCreate()
    # spark.read读取parquet文件,并打印schema,和数据
    df2=spark.read.parquet('file:///root/test.parquet')
    df2.printSchema()
    df2.show()

4.2.3 读取 csv 文件创建 DataFrame

from pyspark.sql import SparkSession

if __name__ == '__main__':
    # 1-创建SparkSession入口
    spark=SparkSession.builder.appName('test').master('local[*]').getOrCreate()
    # 2-spark.read读取csv文件,并打印schema,和数据
    df3=spark.read.option('sep',';').option('header',True).option('inferSchema',True).csv('file:///root/test.csv')
    df3.printSchema()
    df3.show()
    spark.stop()

 4.3 加载文件时,什么时候用textFile?什么时候用read?

  • 如果加载的数据结构化程度不高,则用 textFile 返回 RDD 再处理
    from pyspark.sql import SparkSession
    
    if __name__ == '__main__':
        spark = SparkSession.builder.appName('test').master('local[*]').getOrCreate()
        sc = spark.sparkContext
        # 读取文件生成 RDD
        rdd1 = sc.textFile('file:///root/1.txt')
    
  • 如果加载的数据结构化程度很高,比如 mysql 或 半结构化数据 json、csv,则用 read 返回 DataFrame 再处理
    from pyspark.sql import SparkSession
    
    if __name__ == '__main__':
        spark = SparkSession.builder.appName('test').master('local[*]').getOrCreate()
        sc = spark.sparkContext
        # 读取文件生成 DataFrame(特殊RDD)
        rdd1 = spark.read.text('file:///root/1.txt')

以上是关于Spark中的DataFrame是什么?以及如何构建DataFrame?(附案例)的主要内容,如果未能解决你的问题,请参考以下文章

DataFrame简介

Spark SQL数据加载和保存实战

Spark SQL数据载入和保存实战

Spark SQL数据加载和保存实战

Spark SQL数据加载和保存实战

连接 Apache Spark DataFrame 中的列