Spark基础学习笔记25:Spark SQL数据源 - Parquet文件
Posted howard2005
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark基础学习笔记25:Spark SQL数据源 - Parquet文件相关的知识,希望对你有一定的参考价值。
文章目录
零、本讲学习目标
- 掌握Parquet文件的读取与写入
- 掌握如何进行耗时的Schema合并
一、Parquet概述
- Apache Parquet是Hadoop生态系统中任何项目都可以使用的
列式存储格式
,不受数据处理框架、数据模型和编程语言的影响。Spark SQL支持对Parquet文件的读写,并且可以自动保存源数据的Schema
。当写入Parquet文件时,为了提高兼容性,所有列都会自动转换为“可为空
”状态。
二、读取和写入Parquet的方法
- 加载和写入Parquet文件时,除了可以使用load()方法和save()方法外,还可以直接使用Spark SQL内置的parquet()方法
(一)利用parquet()方法读取parquet文件
1、读取parquet文件
- 执行命令:
val usersdf = spark.read.parquet("hdfs://master:9000/input/users.parquet")
2、显示数据帧内容
- 执行命令:
usersdf.show()
(二)利用parquet()方法写入parquet文件
1、写入parquet文件
- 执行命令:
usersdf.select("name", "favorite_color").write.parquet("hdfs://master:9000/result")
- 报错说
/result
目录已经存在,有两种解决问题的方式,一个是删除result
目录,一个是修改命令,设置覆盖模式 - 导入
SaveMode
类后,执行命令:usersdf.select("name", "favorite_color").write.mode(SaveMode.Overwrite)parquet("hdfs://master:9000/result")
2、查看生成的parquet文件
- 在slave1虚拟机上执行命令:
hdfs dfs -ls /result
三、Schema合并
(一)Schema合并概述
- 与Protocol Buffer、Avro和Thrift一样,Parquet也支持Schema合并。刚开始可以先定义一个简单的Schema,然后根据业务需要逐步向Schema中添加更多的列,最终会产生多个Parquet文件,各个Parquet文件的Schema不同,但是相互兼容。对于这种情况,Spark SQL读取Parquet数据源时可以自动检测并合并所有Parquet文件的Schema。
(二)开启Schema合并功能
- 由于Schema合并是一个相对耗时的操作,并且在多数情况下不是必需的,因此从Spark 1.5.0开始默认将Schema自动合并功能关闭,可以通过两种方式开启。
1、利用option()方法设置
- 读取Parquet文件时,通过调用
option()
方法将数据源的属性mergeSchema
设置为true
val mergedDF = spark.read.option("mergeSchema", "true").parquet("hdfs://master:9000/students")
2、利用config()方法设置
- 构建SparkSession对象时,通过调用
config()
方法将全局SQL属性spark.sql.parquet.mergeSchema
设置为true
val spark = SparkSession.builder()
.appName("SparkSQLDataSource")
.config("spark.sql.parquet.mergeSchema", true)
.master("local[*]")
.getOrCreate()
(三)案例演示Schema合并
1、提出任务
- 向HDFS的目录
/students
中首先写入两个学生的姓名和年龄信息,然后写入两个学生的姓名和成绩信息,最后读取/students
目录中的所有学生数据并合并Schema。
2、完成任务
- 创建
SchemaMergeDemo
单例对象
package net.hw.sparksql
import org.apache.spark.sql.SaveMode, SparkSession
/**
* 功能:演示Schema合并
* 作者:华卫
* 日期:2022年05月11日
*/
object SchemaMergeDemo
def main(args: Array[String]): Unit =
// 创建或得到SparkSession
val spark = SparkSession.builder()
.appName("SparkSQLDataSource")
.config("spark.sql.parquet.mergeSchema", true)
.master("local[*]")
.getOrCreate()
// 导入隐式转换
import spark.implicits._
// 创建列表集合,存储姓名和年龄
val studentList1 = List(("李克文", 23), ("张晓琳", 28))
// 将列表集合转为数据帧,并指定列名name和age
val studentDF1 = spark.sparkContext
.makeRDD(studentList1)
.toDF("name", "age")
// 输出数据帧内容
studentDF1.show()
// 将数据帧写入HDFS指定目录
studentDF1.write.mode(SaveMode.Append)
.parquet("hdfs://master:9000/students")
// 创建列表集合,存储姓名和成绩
val studentList2 = List(("无心剑", 99), ("陈鸿宇", 78))
// 将列表集合转为数据帧,并指定列名name和age
val studentDF2 = spark.sparkContext
.makeRDD(studentList2)
.toDF("name", "score")
// 输出数据帧内容
studentDF2.show()
// 将数据帧写入HDFS指定目录
studentDF2.write.mode(SaveMode.Append)
.parquet("hdfs://master:9000/students")
// 读取指定目录下多个文件
val mergedDF = spark.read.option("mergeSchema", true)
.parquet("hdfs://master:9000/students")
// 输出Schema信息
mergedDF.printSchema()
// 输出数据帧内容
mergedDF.show()
- 运行程序,抛出两个异常
- 第一个是IO异常:
java.io.IOException: Could not locate executable null\\bin\\winutils.exe in the Hadoop binaries.
- 第二个是访问控制异常:
org.apache.hadoop.security.AccessControlException: Permission denied: user=Administrator, access=WRITE, inode="/students":root:supergroup:drwxr-xr-x
- 先解决第一个异常。在Windows环境中缺少
winutils.exe
程序而抛出异常。一般情况下,Spark运行在Linux系统上,在Windows下运行时需要安装支持插件hadoop2.7-common-bin
- 下载链接:https://pan.baidu.com/s/1TbmAnwi4hscLtlqV9-rrXg 提取码:plis
- 解压缩到指定目录
- 设置环境变量,让系统可以搜索到
winutils.exe
- 解决第二个异常。添上一句,设置HADOOP用户名属性
- 创建日志属性文件 -
log4j.properties
,免得看见太多Spark的log信息
log4j.rootLogger=stdout, logfile
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spark.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
- 运行程序,查看结果
- 从输出的Schema信息和数据内容可以看出,Spark SQL在读取Parquet文件数据时,自动将不同文件的Schema信息进行合并。
以上是关于Spark基础学习笔记25:Spark SQL数据源 - Parquet文件的主要内容,如果未能解决你的问题,请参考以下文章
Spark基础学习笔记27:Spark SQL数据源 - Hive表
学习笔记Spark—— Spark SQL应用—— Spark DataFrame基础操作
Spark基础学习笔记28:Spark SQL数据源 - JDBC
学习笔记Spark—— Spark SQL应用—— Spark DataSet基础操作