SparkSQL-数据的加载和保存
Posted open_test01
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SparkSQL-数据的加载和保存相关的知识,希望对你有一定的参考价值。
通用的加载和保存方式
SparkSQL 提供了通用的保存数据和数据加载的方式。这里的通用指的是使用相同的 API,根据不同的参数读取和保存不同格式的数据,SparkSQL 默认读取和保存的文件格式 为 parquet
加载数据
spark.read.load 是加载数据的通用方法 ,如果读取不同格式的数据,可以对不同的数据格式进行设定
spark.read.format("…")[.option("…")].load("…")
- format("…"):指定加载的数据类型,包括"csv"、"jdbc"、"json"、"orc"、"parquet"和 "textFile"。
- load("…"):在"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"格式下需要传入加载 数据的路径。
- option("…"):在"jdbc"格式下需要传入 JDBC 相应参数,url、user、password 和 dbtable
val df = spark.read.format("json").load("input/user.json")
其实,我们也可以直 接在文件上进行查询: 文件格式.`文件路径`
spark.sql("select * from json.`input/user.json`").show
保存数据
df.write.save 是保存数据的通用方法,如果保存不同格式的数据,可以对不同的数据格式进行设定
- format("…"):指定保存的数据类型,包括"csv"、"jdbc"、"json"、"orc"、"parquet"和 "textFile"。
- save ("…"):在"csv"、"orc"、"parquet"和"textFile"格式下需要传入保存数据的路径。
- option("…"):在"jdbc"格式下需要传入 JDBC 相应参数,url、user、password 和 dbtable
df.write.format("json").save("output/res")
保存操作可以使用 SaveMode, 用来指明如何处理数据,使用 mode()方法来设置。 有一点很重要: 这些 SaveMode 都是没有加锁的, 也不是原子操作。
SaveMode 是一个枚举类,其中的常量包括:
文件存在可再保存追加一个新的
df.write.mode("append").json("/opt/module/data/output")
csv文件
Spark SQL 可以配置 CSV 文件的列表信息,读取 CSV 文件,CSV 文件的第一行设置为 数据列
spark.read.format("csv").option("sep", ";").option("inferSchema", "true").option("header", "true").load("data/user.csv")
操作mysql
Spark SQL 可以通过 JDBC 从关系型数据库中读取数据的方式创建 DataFrame,通过对 DataFrame 一系列的计算后,还可以将数据再写回关系型数据库中。如果使用 spark-shell 操 作,可在启动 shell 时指定相关的数据库驱动路径或者将相关的数据库驱动放到 spark 的类 路径下。
在 Idea 中通过 JDBC 对 Mysql 进行操作
导入依赖
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>
读取数据和保存数据
准数据
def main(args: Array[String]): Unit =
//创建sparksql的运行环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
//创建sparkSession对象
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._
//读取mysql数据
val df = spark.read
.format("jdbc")
.option("url", "jdbc:mysql://master:3306/spark-sql")
.option("driver", "com.mysql.jdbc.Driver")
.option("user", "root")
.option("password", "p@ssw0rd")
.option("dbtable", "user")
.load()
df.show()
//保存数据
df.write
.format("jdbc")
.option("url", "jdbc:mysql://master:3306/spark-sql")
.option("driver", "com.mysql.jdbc.Driver")
.option("user", "root")
.option("password", "p@ssw0rd")
.option("dbtable", "newUser")
.mode(SaveMode.Append) //追加
.save()
spark.stop()
操作内置Hive
若要把 Spark SQL 连接到一个部署好的 Hive 上,你必须把 hive-site.xml 复制到 Spark 的配置文件目录中($SPARK_HOME/conf)。即使没有部署好 Hive,Spark SQL 也可以 运行。 需要注意的是,如果你没有部署好 Hive,Spark SQL 会在当前的工作目录中创建出 自己的 Hive 元数据仓库,叫作 metastore_db。此外,如果你尝试使用 HiveQL 中的 CREATE TABLE (并非 CREATE EXTERNAL TABLE)语句来创建表,这些表会被放在你默 认的文件系统中的 /user/hive/warehouse 目录中(如果你的 classpath 中有配好的 hdfs-site.xml,默认的文件系统就是 HDFS,否则就是本地文件系统)。 spark-shell 默认是 Hive 支持的;代码中是默认不支持的,需要手动指定(加一个参数即可)。
查看表
如果没有则会在spark目录中自动生成一个源数据库,使用 Spark 内嵌的 Hive, 则什么都不用做, 直接使用即可. Hive 的元数据存储在 derby 中, 默认仓库地址:$SPARK_HOME/spark-warehouse
创建表
spark.sql("create table emp(id int)")
查看
将文件数据加载到表中
spark.sql("load data local inpath 'data/id.txt' into table emp")
操作外置Hive
如果想连接外部已经部署好的 Hive,需要通过以下几个步骤:
1、Spark 要接管 Hive 需要把 hive-site.xml 拷贝到 conf/目录下
2、把 Mysql 的驱动 copy 到 jars/目录下
3、 如果访问不到 hdfs,则需要把 core-site.xml 和 hdfs-site.xml 拷贝到 conf/目录下
4、重启 spark-shell
scala> spark.sql("show tables").show
20/04/25 22:05:14 WARN ObjectStore: Failed to get database global_temp, returning
NoSuchObjectException
+--------+--------------------+-----------+
|database| tableName|isTemporary|
+--------+--------------------+-----------+
| default| emp| false|
| default|hive_hbase_emp_table| false|
| default| relevance_hbase_emp| false|
| default| staff_hive| false|
| default| ttt| false|
| default| user_visit_action| false|
+--------+--------------------+-----------+
Spark SQL CLI
Spark SQL CLI 可以很方便的在本地运行 Hive 元数据服务以及从命令行执行查询任务。在 Spark 目录下执行如下命令启动 Spark SQL CLI,直接执行 SQL 语句,类似一 Hive 窗口
bin/spark-sql
代码操作 Hive
导入依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>
将 hive-site.xml 文件拷贝到项目的 resources 目录中
后台启动hive
hiveserver2 &
nohup hive --service metastore &
在IEDA中连接使用
def main(args: Array[String]): Unit =
//创建sparksql的运行环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
//创建sparkSession对象
val spark = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
//使用sparkSQL连接外置hive
spark.sql("create database ee")
spark.sql("show databases").show()
spark.close()
回到虚拟机中查看hive是否操作成功
Spark SQL数据加载和保存实战
一:前置知识详解:
Spark SQL重要是操作DataFrame,DataFrame本身提供了save和load的操作,
Load:可以创建DataFrame,
Save:把DataFrame中的数据保存到文件或者说与具体的格式来指明我们要读取的文件的类型以及与具体的格式来指出我们要输出的文件是什么类型。
二:Spark SQL读写数据代码实战:
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.util.ArrayList;
import java.util.List;
public class SparkSQLLoadSaveOps {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkSQLLoadSaveOps");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext = new SQLContext(sc);
/**
* read()是DataFrameReader类型,load可以将数据读取出来
*/
DataFrame peopleDF = sqlContext.read().format("json").load("E:\\\\Spark\\\\Sparkinstanll_package\\\\Big_Data_Software\\\\spark-1.6.0-bin-hadoop2.6\\\\examples\\\\src\\\\main\\\\resources\\\\people.json");
/**
* 直接对DataFrame进行操作
* Json: 是一种自解释的格式,读取Json的时候怎么判断其是什么格式?
* 通过扫描整个Json。扫描之后才会知道元数据
*/
//通过mode来指定输出文件的是append。创建新文件来追加文件
peopleDF.select("name").write().mode(SaveMode.Append).save("E:\\\\personNames");
}
}
读取过程源码分析如下:
1. read方法返回DataFrameReader,用于读取数据。
/**
* :: Experimental ::
* Returns a [[DataFrameReader]] that can be used to read data in as a [[DataFrame]].
* {{{
* sqlContext.read.parquet("/path/to/file.parquet")
* sqlContext.read.schema(schema).json("/path/to/file.json")
* }}}
*
* @group genericdata
* @since 1.4.0
*/
@Experimental
//创建DataFrameReader实例,获得了DataFrameReader引用
def read: DataFrameReader = new DataFrameReader(this)
2. 然后再调用DataFrameReader类中的format,指出读取文件的格式。
/**
* Specifies the input data source format.
*
* @since 1.4.0
*/
def format(source: String): DataFrameReader = {
this.source = source
this
}
3. 通过DtaFrameReader中load方法通过路径把传入过来的输入变成DataFrame。
/**
* Loads input in as a [[DataFrame]], for data sources that require a path (e.g. data backed by
* a local or distributed file system).
*
* @since 1.4.0
*/
// TODO: Remove this one in Spark 2.0.
def load(path: String): DataFrame = {
option("path", path).load()
}
至此,数据的读取工作就完成了,下面就对DataFrame进行操作。
下面就是写操作!!!
1. 调用DataFrame中select函数进行对列筛选
/**
* Selects a set of columns. This is a variant of `select` that can only select
* existing columns using column names (i.e. cannot construct expressions).
*
* {{{
* // The following two are equivalent:
* df.select("colA", "colB")
* df.select($"colA", $"colB")
* }}}
* @group dfops
* @since 1.3.0
*/
@scala.annotation.varargs
def select(col: String, cols: String*): DataFrame = select((col +: cols).map(Column(_)) : _*)
2. 然后通过write将结果写入到外部存储系统中。
/**
* :: Experimental ::
* Interface for saving the content of the [[DataFrame]] out into external storage.
*
* @group output
* @since 1.4.0
*/
@Experimental
def write: DataFrameWriter = new DataFrameWriter(this)
3. 在保持文件的时候mode指定追加文件的方式
/**
* Specifies the behavior when data or table already exists. Options include:
// Overwrite是覆盖
* - `SaveMode.Overwrite`: overwrite the existing data.
//创建新的文件,然后追加
* - `SaveMode.Append`: append the data.
* - `SaveMode.Ignore`: ignore the operation (i.e. no-op).
* - `SaveMode.ErrorIfExists`: default option, throw an exception at runtime.
*
* @since 1.4.0
*/
def mode(saveMode: SaveMode): DataFrameWriter = {
this.mode = saveMode
this
}
4. 最后,save()方法触发action,将文件输出到指定文件中。
/**
* Saves the content of the [[DataFrame]] at the specified path.
*
* @since 1.4.0
*/
def save(path: String): Unit = {
this.extraOptions += ("path" -> path)
save()
}
三:Spark SQL读写整个流程图如下:
四:对于流程中部分函数源码详解:
DataFrameReader.Load()
1. Load()返回DataFrame类型的数据集合,使用的数据是从默认的路径读取。
/**
* Returns the dataset stored at path as a DataFrame,
* using the default data source configured by spark.sql.sources.default.
*
* @group genericdata
* @deprecated As of 1.4.0, replaced by `read().load(path)`. This will be removed in Spark 2.0.
*/
@deprecated("Use read.load(path). This will be removed in Spark 2.0.", "1.4.0")
def load(path: String): DataFrame = {
//此时的read就是DataFrameReader
read.load(path)
}
2. 追踪load源码进去,源码如下:
在DataFrameReader中的方法。Load()通过路径把输入传进来变成一个DataFrame。
/**
* Loads input in as a [[DataFrame]], for data sources that require a path (e.g. data backed by
* a local or distributed file system).
*
* @since 1.4.0
*/
// TODO: Remove this one in Spark 2.0.
def load(path: String): DataFrame = {
option("path", path).load()
}
3. 追踪load源码如下:
/**
* Loads input in as a [[DataFrame]], for data sources that don't require a path (e.g. external
* key-value stores).
*
* @since 1.4.0
*/
def load(): DataFrame = {
//对传入的Source进行解析
val resolved = ResolvedDataSource(
sqlContext,
userSpecifiedSchema = userSpecifiedSchema,
partitionColumns = Array.empty[String],
provider = source,
options = extraOptions.toMap)
DataFrame(sqlContext, LogicalRelation(resolved.relation))
}
DataFrameReader.format()
1. Format:具体指定文件格式,这就获得一个巨大的启示是:如果是Json文件格式可以保持为Parquet等此类操作。
Spark SQL在读取文件的时候可以指定读取文件的类型。例如,Json,Parquet.
/**
* Specifies the input data source format.Built-in options include “parquet”,”json”,etc.
*
* @since 1.4.0
*/
def format(source: String): DataFrameReader = {
this.source = source //FileType
this
}
DataFrame.write()
1. 创建DataFrameWriter实例
/**
* :: Experimental ::
* Interface for saving the content of the [[DataFrame]] out into external storage.
*
* @group output
* @since 1.4.0
*/
@Experimental
def write: DataFrameWriter = new DataFrameWriter(this)
2. 追踪DataFrameWriter源码如下:
以DataFrame的方式向外部存储系统中写入数据。
/**
* :: Experimental ::
* Interface used to write a [[DataFrame]] to external storage systems (e.g. file systems,
* key-value stores, etc). Use [[DataFrame.write]] to access this.
*
* @since 1.4.0
*/
@Experimental
final class DataFrameWriter private[sql](df: DataFrame) {
DataFrameWriter.mode()
1. Overwrite是覆盖,之前写的数据全都被覆盖了。
Append:是追加,对于普通文件是在一个文件中进行追加,但是对于parquet格式的文件则创建新的文件进行追加。
/**
* Specifies the behavior when data or table already exists. Options include:
* - `SaveMode.Overwrite`: overwrite the existing data.
* - `SaveMode.Append`: append the data.
* - `SaveMode.Ignore`: ignore the operation (i.e. no-op).
//默认操作
* - `SaveMode.ErrorIfExists`: default option, throw an exception at runtime.
*
* @since 1.4.0
*/
def mode(saveMode: SaveMode): DataFrameWriter = {
this.mode = saveMode
this
}
2. 通过模式匹配接收外部参数
/**
* Specifies the behavior when data or table already exists. Options include:
* - `overwrite`: overwrite the existing data.
* - `append`: append the data.
* - `ignore`: ignore the operation (i.e. no-op).
* - `error`: default option, throw an exception at runtime.
*
* @since 1.4.0
*/
def mode(saveMode: String): DataFrameWriter = {
this.mode = saveMode.toLowerCase match {
case "overwrite" => SaveMode.Overwrite
case "append" => SaveMode.Append
case "ignore" => SaveMode.Ignore
case "error" | "default" => SaveMode.ErrorIfExists
case _ => throw new IllegalArgumentException(s"Unknown save mode: $saveMode. " +
"Accepted modes are 'overwrite', 'append', 'ignore', 'error'.")
}
this
}
DataFrameWriter.save()
1. save将结果保存传入的路径。
/**
* Saves the content of the [[DataFrame]] at the specified path.
*
* @since 1.4.0
*/
def save(path: String): Unit = {
this.extraOptions += ("path" -> path)
save()
}
2. 追踪save方法。
/**
* Saves the content of the [[DataFrame]] as the specified table.
*
* @since 1.4.0
*/
def save(): Unit = {
ResolvedDataSource(
df.sqlContext,
source,
partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]),
mode,
extraOptions.toMap,
df)
}
3. 其中source是SQLConf的defaultDataSourceName
private var source: String = df.sqlContext.conf.defaultDataSourceName
其中DEFAULT_DATA_SOURCE_NAME默认参数是parquet。
// This is used to set the default data source
val DEFAULT_DATA_SOURCE_NAME = stringConf("spark.sql.sources.default",
defaultValue = Some("org.apache.spark.sql.parquet"),
doc = "The default data source to use in input/output.")
DataFrame.scala中部分函数详解:
1. toDF函数是将RDD转换成DataFrame
/**
* Returns the object itself.
* @group basic
* @since 1.3.0
*/
// This is declared with parentheses to prevent the Scala compiler from treating
// `rdd.toDF("1")` as invoking this toDF and then apply on the returned DataFrame.
def toDF(): DataFrame = this
2. show()方法:将结果显示出来
/**
* Displays the [[DataFrame]] in a tabular form. For example:
* {{{
* year month AVG('Adj Close) MAX('Adj Close)
* 1980 12 0.503218 0.595103
* 1981 01 0.523289 0.570307
* 1982 02 0.436504 0.475256
* 1983 03 0.410516 0.442194
* 1984 04 0.450090 0.483521
* }}}
* @param numRows Number of rows to show
* @param truncate Whether truncate long strings. If true, strings more than 20 characters will
* be truncated and all cells will be aligned right
*
* @group action
* @since 1.5.0
*/
// scalastyle:off println
def show(numRows: Int, truncate: Boolean): Unit = println(showString(numRows, truncate))
// scalastyle:on println
追踪showString源码如下:showString中触发action收集数据。
/**
* Compose the string representing rows for output
* @param _numRows Number of rows to show
* @param truncate Whether truncate long strings and align cells right
*/
private[sql] def showString(_numRows: Int, truncate: Boolean = true): String = {
val numRows = _numRows.max(0)
val sb = new StringBuilder
val takeResult = take(numRows + 1)
val hasMoreData = takeResult.length > numRows
val data = takeResult.take(numRows)
val numCols = schema.fieldNames.length
以上是关于SparkSQL-数据的加载和保存的主要内容,如果未能解决你的问题,请参考以下文章