SparkSQL 读写 Mysql
Posted 宝哥大数据
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SparkSQL 读写 Mysql相关的知识,希望对你有一定的参考价值。
参考: https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html
一、读 mysql
package com.chb.test
import java.util.Properties
import org.apache.spark.sql.SaveMode, SparkSession
object Test3
def main(args: Array[String]): Unit =
val spark = SparkSession.builder()
.master("local[*]")
.getOrCreate()
//mysql config
val url="jdbc:mysql://chb1:3306/test"
val tableName="person"
// 配置连接数据库的相关属性
val properties = new Properties()
properties.setProperty("user","root")
properties.setProperty("password","123456")
// 第一种
// val jdbcDF = spark.read.jdbc(url, tableName,properties)
// 第二种
val jdbcDF = spark.read
.format("jdbc")
.option("url", url)
.option("user", "root")
.option("password", "123456")
.option("dbtable", tableName) -- 指定表名
.load()
case class Person(name: String, age: Long)
1.2、 query
spark.read.format("jdbc")
.option("url", jdbcUrl)
.option("query", "select c1, c2 from t1")
.load()
二、写 Mysql
package com.chb.test
import java.util.Properties
import org.apache.spark.sql.SaveMode, SparkSession
object Test3
def main(args: Array[String]): Unit =
val spark = SparkSession.builder()
.master("local[*]")
.getOrCreate()
//mysql config
val url="jdbc:mysql://chb1:3306/test"
val tableName="person"
// 配置连接数据库的相关属性
val properties = new Properties()
properties.setProperty("user","root")
properties.setProperty("password","123456")
// 写mysql
import spark.implicits._
val dfResult = Seq(Person("lisan", 33)).toDF
dfResult.write.mode(SaveMode.Append)
.jdbc(url, tableName, properties)
case class Person(name: String, age: Long)
2.1、注意 SaveMode
- Append 追加
- Overwrite 覆盖, 将表中数据清理,重新写入
- ErrorIfExists 存在报错
- Ignore
2.2、写入的数据结构也表结构不一致,Append 会对没有值字段赋值null, Overwrite会修改表结构
创建测试表与数据
create table person(id int, name varchar(30), age int);
-- 插入数据
insert into person values(1, 'chb', 23), (2, 'ling', 18);
2.2.1、Append
// 写mysql
import spark.implicits._
val dfResult = Seq(Person("lisan", 33)).toDF
dfResult.write.mode(SaveMode.Append)
.jdbc(url, tableName, properties)
没有字段会赋值null
2.2.2、Overwrite (生产上需要注意字段完整)
SparkSQL编程方式纯SQL读写Hive数据源
一、准备工作
- 实验环境:IDEA + CentOS7 + Spark2.4.8+Hive2.3.3+MySQL2.7+Hadoop2.7.3
- 数据准备:Hive中的emp员工信息表
- 前置工作:
- Hadoop开启HDFS服务(
必选
) - 开启hiveserver2服务(可选)
- 开启Spark服务(可选)
- 将hive-site.xml复制到idea工程下的resources目录下(
必选
)
- Hadoop开启HDFS服务(
二、任务分析
借助sparksql读写hive表,利用纯的SQL来完成对emp表中按照部门求其工资,并按照工资总额进行降序排序。
三、编码实现
- 创建maven工程
- 添加maven依赖,即在pom.xml中添加hive的依赖,spark的依赖请参考之前的实验,如下:
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.12</artifactId> <version>2.4.8</version> </dependency>
- 将虚拟机中的hive-site.xml导出来,并放置在idea工程resources目录下,如图所示:
- 创建
HiveDemo.scala
的object对象,编写如下代码即可:
import org.apache.spark.sql.SparkSession
object HiveDemo {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local[2]")
.appName("Test")
// 如不配置,则使用本地的warehouse
.config("spark.sql.warehouse.dir", "hdfs://niit01:9000/user/hive/warehouse")
.enableHiveSupport()
.getOrCreate()
// 引入隐士函数
import spark.implicits._
import spark.sql
sql("select deptNo,sum(sal) as total from emp group by deptNo order by total desc").show
// 关系资源
spark.stop()
}
}
注意:代码中的.config部分是hdfs上的路径,故需要开启hdfs服务。如删除.config,则会在工程目录下生成metastore_db目录,其作为hive的元数据库的目录
五、运行测试
以上是关于SparkSQL 读写 Mysql的主要内容,如果未能解决你的问题,请参考以下文章