Spark SQL实战(07)-Data Sources

Posted JavaEdge.

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark SQL实战(07)-Data Sources相关的知识,希望对你有一定的参考价值。

1 概述

Spark SQL通过DataFrame接口支持对多种数据源进行操作。

DataFrame可使用关系型变换进行操作,也可用于创建临时视图。将DataFrame注册为临时视图可以让你对其数据运行SQL查询。

本节介绍使用Spark数据源加载和保存数据的一般方法,并进一步介绍可用于内置数据源的特定选项。

数据源关键操作:

  • load
  • save

2 大数据作业基本流程

input 业务逻辑 output
不管是使用MR/Hive/Spark/Flink/Storm。

Spark能处理多种数据源的数据,而且这些数据源可以是在不同地方:

  • file/HDFS/S3/OSS/COS/RDBMS
  • json/ORC/Parquet/JDBC
object DataSourceApp 

  def main(args: Array[String]): Unit = 
    val spark: SparkSession = SparkSession.builder()
      .master("local").getOrCreate()
    
    text(spark)
    // json(spark)
    // common(spark)
    // parquet(spark)

    // convert(spark)

    // jdbc(spark)
    jdbc2(spark)
    spark.stop()
  

3 text数据源读写

读取文本文件的 API,SparkSession.read.text()

参数:

  • path:读取文本文件的路径。可以是单个文件、文件夹或者包含通配符的文件路径。
  • wholetext:如果为 True,则将整个文件读取为一条记录;否则将每行读取为一条记录。
  • lineSep:如果指定,则使用指定的字符串作为行分隔符。
  • pathGlobFilter:用于筛选文件的通配符模式。
  • recursiveFileLookup:是否递归查找子目录中的文件。
  • allowNonExistingFiles:是否允许读取不存在的文件。
  • allowEmptyFiles:是否允许读取空文件。

返回一个 DataFrame 对象,其中每行是文本文件中的一条记录。

def text(spark: SparkSession): Unit = 
  import spark.implicits._

  val textDF: DataFrame = spark.read.text(
    "/Users/javaedge/Downloads/sparksql-train/data/people.txt")

  val result: Dataset[(String, String)] = textDF.map(x => 
    val splits: Array[String] = x.getString(0).split(",")
    (splits(0).trim, splits(1).trim)
  )

编译无问题,运行时报错:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Text data source supports only a single column, and you have 2 columns.;

思考下,如何使用text方式,输出多列的值?

修正后

val result: Dataset[String] = textDF.map(x => 
  val splits: Array[String] = x.getString(0).split(",")
  splits(0).trim
)

result.write.text("out")

继续报错:

Exception in thread "main" org.apache.spark.sql.AnalysisException: path file:/Users/javaedge/Downloads/sparksql-train/out already exists.;

回想Hadoop中MapReduce的输出:

  • 第一次0K
  • 第二次也会报错输出目录已存在

这关系到 Spark 中的 mode

SaveMode

Spark SQL中,使用DataFrame或Dataset的write方法将数据写入外部存储系统时,使用“SaveMode”参数指定如何处理已存在的数据。

SaveMode有四种取值:

  1. SaveMode.ErrorIfExists:如果目标路径已经存在,则会引发异常
  2. SaveMode.Append:将数据追加到现有数据
  3. SaveMode.Overwrite:覆盖现有数据
  4. SaveMode.Ignore:若目标路径已经存在,则不执行任何操作

所以,修正如下:

result.write.mode(SaveMode.overwrite).text("out")

4 JSON 数据源

// JSON
def json(spark: SparkSession): Unit = 
  import spark.implicits._

  val jsonDF: DataFrame = spark.read.json(
    "/Users/javaedge/Downloads/sparksql-train/data/people.json")

  jsonDF.show()

  // 只要age>20的数据
  jsonDF.filter("age > 20")
    .select("name")
    .write.mode(SaveMode.Overwrite).json("out")
  
output:
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

嵌套 JSON

// 嵌套 JSON
val jsonDF2: DataFrame = spark.read.json(
  "/Users/javaedge/Downloads/sparksql-train/data/people2.json")
jsonDF2.show()

jsonDF2.select($"name",
  $"age",
  $"info.work".as("work"),
  $"info.home".as("home"))
  .write.mode("overwrite")
  .json("out")

output:
+---+-------------------+----+
|age|               info|name|
+---+-------------------+----+
| 30|[shenzhen, beijing]|  PK|
+---+-------------------+----+

5 标准写法

// 标准API写法
private def common(spark: SparkSession): Unit = 
  import spark.implicits._

  val textDF: DataFrame = spark.read.format("text").load(
    "/Users/javaedge/Downloads/sparksql-train/data/people.txt")
  val jsonDF: DataFrame = spark.read.format("json").load(
    "/Users/javaedge/Downloads/sparksql-train/data/people.json")
  textDF.show()
  println("~~~~~~~~")
  jsonDF.show()

  jsonDF.write.format("json").mode("overwrite").save("out")



output:
+-----------+
|      value|
+-----------+
|Michael, 29|
|   Andy, 30|
| Justin, 19|
+-----------+

~~~~~~~~
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

6 Parquet数据源

6.1 简介

一种列式存储格式,在大数据环境中高效地存储和处理数据。由Hadoop生态系统中的Apache Parquet项目开发的。

6.2 设计目标

支持高效的列式存储和压缩,并提供高性能的读/写能力,以便处理大规模结构化数据。

Parquet可以与许多不同的计算框架一起使用,如Apache Hadoop、Apache Spark、Apache Hive等,因此广泛用于各种大数据应用程序中。

6.3 优点

高性能、节省存储空间、支持多种编程语言和数据类型、易于集成和扩展等。

private def parquet(spark: SparkSession): Unit = 
  import spark.implicits._

  val parquetDF: DataFrame = spark.read.parquet(
    "/Users/javaedge/Downloads/sparksql-train/data/users.parquet")
  parquetDF.printSchema()
  parquetDF.show()

  parquetDF.select("name", "favorite_numbers")
    .write.mode("overwrite")
    .option("compression", "none")
    .parquet("out")
  
output:
root
 |-- name: string (nullable = true)
 |-- favorite_color: string (nullable = true)
 |-- favorite_numbers: array (nullable = true)
 |    |-- element: integer (containsNull = true)

+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+

7convert

方便从一种数据源写到另一种数据源。

存储类型转换:JSON==>Parquet

def convert(spark: SparkSession): Unit = 
  import spark.implicits._

  val jsonDF: DataFrame = spark.read.format("json")
    .load("/Users/javaedge/Downloads/sparksql-train/data/people.json")
  jsonDF.show()

  jsonDF.filter("age>20")
    .write.format("parquet").mode(SaveMode.Overwrite).save("out")

8 JDBC

有些数据是在mysql,使用Spark处理,肯定要通过Spark读出MySQL的数据。
数据源是text/json,通过Spark处理完后,要将统计结果写入MySQL。

查 DB

写法一

def jdbc(spark: SparkSession): Unit = 
  import spark.implicits._

  val jdbcDF = spark.read
    .format("jdbc")
    .option("url", "jdbc:mysql://localhost:3306")
    .option("dbtable", "smartrm_monolith.order")
    .option("user", "root")
    .option("password", "root")
    .load()

  jdbcDF.filter($"order_id" > 150).show(100)

写法二

val connectionProperties = new Properties()
connectionProperties.put("user", "root")
connectionProperties.put("password", "root")

val jdbcDF2: DataFrame = spark.read
  .jdbc(url, srcTable, connectionProperties)

jdbcDF2.filter($"order_id" > 100)

写 DB

val connProps = new Properties()
connProps.put("user", "root")
connProps.put("password", "root")

val jdbcDF: DataFrame = spark.read.jdbc(url, srcTable, connProps)

jdbcDF.filter($"order_id" > 100)
  .write.jdbc(url, "smartrm_monolith.order_bak", connProps)

若 目标表不存在,会自动帮你创建:

统一配置管理

如何将那么多数据源配置参数统一管理呢?

先引入依赖:

<dependency>
    <groupId>com.typesafe</groupId>
    <artifactId>config</artifactId>
    <version>1.3.3</version>
</dependency>

配置文件:

读配置的程序:

package com.javaedge.bigdata.chapter05

import com.typesafe.config.Config, ConfigFactory

object ParamsApp 

  def main(args: Array[String]): Unit = 

    val config: Config = ConfigFactory.load()
    val url: String = config.getString("db.default.url")
    println(url)

  


private def jdbcConfig(spark: SparkSession): Unit = 
  import spark.implicits._

  val config = ConfigFactory.load()
  val url = config.getString("db.default.url")
  val user = config.getString("db.default.user")
  val password = config.getString("db.default.password")
  val driver = config.getString("db.default.driver")
  val database = config.getString("db.default.database")
  val table = config.getString("db.default.table")
  val sinkTable = config.getString("db.default.sink.table")

  val connectionProperties = new Properties()
  connectionProperties.put("user", user)
  connectionProperties.put("password", password)

  val jdbcDF: DataFrame = spark.read.jdbc(url, s"$database.$table", connectionProperties)

  jdbcDF.filter($"order_id" > 100).show()

写到新表:

jdbcDF.filter($"order_id" > 158)
.write.jdbc(url, s"$database.$sinkTable", connectionProperties)

Structured Streaming 实战案例 读取Scoker

1.1.1.读取Socket数据

 

●准备工作

nc -lk 9999

hadoop spark sqoop hadoop spark hive hadoop

●代码演示:

import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object WordCount {
  def main(args: Array[String]): Unit = {
    //1.创建SparkSession,因为StructuredStreaming的数据模型也是DataFrame/DataSet
    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    //2.接收数据
    val dataDF: DataFrame = spark.readStream
      .option("host", "node01")
      .option("port", 9999)
      .format("socket")
      .load()
    //3.处理数据
    import spark.implicits._
    val dataDS: Dataset[String] = dataDF.as[String]
    val wordDS: Dataset[String] = dataDS.flatMap(_.split(" "))
    val result: Dataset[Row] = wordDS.groupBy("value").count().sort($"count".desc)
    //result.show()
    //Queries with streaming sources must be executed with writeStream.start();
    result.writeStream
      .format("console")//往控制台写
      .outputMode("complete")//每次将所有的数据写出
      .trigger(Trigger.ProcessingTime(0))//触发时间间隔,0表示尽可能的快
      .option("checkpointLocation","./810")//设置checkpoint目录,用来做合并
      .start()//开启
      .awaitTermination()//等待停止
  }
}
 
 
 
 
 
32
 
 
 
 
 
1
import org.apache.spark.SparkContext
2
import org.apache.spark.sql.streaming.Trigger
3
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
4

5
object WordCount {
6
  def main(args: Array[String]): Unit = {
7
    //1.创建SparkSession,因为StructuredStreaming的数据模型也是DataFrame/DataSet
8
    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()
9
    val sc: SparkContext = spark.sparkContext
10
    sc.setLogLevel("WARN")
11
    //2.接收数据
12
    val dataDF: DataFrame = spark.readStream
13
      .option("host", "node01")
14
      .option("port", 9999)
15
      .format("socket")
16
      .load()
17
    //3.处理数据
18
    import spark.implicits._
19
    val dataDS: Dataset[String] = dataDF.as[String]
20
    val wordDS: Dataset[String] = dataDS.flatMap(_.split(" "))
21
    val result: Dataset[Row] = wordDS.groupBy("value").count().sort($"count".desc)
22
    //result.show()
23
    //Queries with streaming sources must be executed with writeStream.start();
24
    result.writeStream
25
      .format("console")//往控制台写
26
      .outputMode("complete")//每次将所有的数据写出
27
      .trigger(Trigger.ProcessingTime(0))//触发时间间隔,0表示尽可能的快
28
      .option("checkpointLocation","./810")//设置checkpoint目录,用来做合并
29
      .start()//开启
30
      .awaitTermination()//等待停止
31
  }
32
}
 
 
代码截图:
技术图片
 

 



以上是关于Spark SQL实战(07)-Data Sources的主要内容,如果未能解决你的问题,请参考以下文章

Structured Streaming 实战案例 读取Scoker

Spark SQL知识点与实战

第57课:Spark SQL on Hive配置及实战

Spark Structured Streaming实战

Spark修炼之道(进阶篇)——Spark入门到精通:第十节 Spark SQL案例实战

Spark SQL数据加载和保存实战