Spark读取mysql数据库实例
Posted 向前展望,倒后推理
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark读取mysql数据库实例相关的知识,希望对你有一定的参考价值。
1.Spark可以读取多种数据源,本实例为读取mysql.
2.准备工作:
sacla,idea,mysql-connector-java,版本从:https://mvnrepository.com/获取
3.数据获取方式一:
object WordFreq { def main(args: Array[String]) { val spark: SparkSession = SparkSession.builder().master("local").appName("getDatafromMysql") .config("spark.sql.shuffle.partitions", 1).getOrCreate() val properties: Properties = new Properties() properties.setProperty("user", "root") properties.setProperty("password", "root") properties.setProperty("driver", "com.mysql.jdbc.Driver") //方式一 val person: DataFrame = spark.read.jdbc("jdbc:mysql://localhost:3306/acc", "ttt", properties) person.show() //方式二 spark.read.jdbc("jdbc:mysql://localhost:3306/acc", "(select * from ut_tt) T", properties).show() } }
数据获取方式二:
val spark: SparkSession = SparkSession.builder().master("local").appName("getDatafromMysql") .config("spark.sql.shuffle.partitions", 1).getOrCreate() val map: Map[String, String] = Map[String, String]( elems = "url" -> "jdbc:mysql://localhost:3306/yyyy", "driver" -> "com.mysql.jdbc.Driver", "user" -> "root", "password" -> "root", "dbtable" -> "notice") val score: DataFrame = spark.read.format("jdbc").options(map).load score.show()
数据获取方式三:
val spark: SparkSession = SparkSession.builder().master("local").appName("getDatafromMysql") .config("spark.sql.shuffle.partitions", 1).getOrCreate() val reader: DataFrameReader = spark.read.format("jdbc") .option("url", "jdbc:mysql://localhost:3306/yyyyyy") .option("driver", "com.mysql.jdbc.Driver") .option("user", "root") .option("password", "root") .option("dbtable", "notice") val source2: DataFrame = reader.load() source2.show()
将数据插入mysql数据库
//将查询结果插入mysql表 val spark: SparkSession = SparkSession.builder().master("local").appName("getDatafromMysql") .config("spark.sql.shuffle.partitions", 1).getOrCreate() val result = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/ttttttt") //*****这是数据库名 .option("driver", "com.mysql.jdbc.Driver").option("dbtable", "notice")//*****是表名 .option("user", "root").option("password", "root").load() val properties: Properties = new Properties() properties.setProperty("user", "root") properties.setProperty("password", "root") properties.setProperty("driver", "com.mysql.jdbc.Driver") properties.setProperty("characterEncoding","utf8") result.write.mode(SaveMode.Append).jdbc("jdbc:mysql://localhost:3306/iii", "notice_copy1", properties)
所需引用:
import org.apache.spark.sql.DataFrame import org.apache.spark.sql.SparkSession import java.util.Properties
在pom文件中,添加mysql-connector-java引用:
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>*****</version> </dependency>
4.运行结果:
以上是关于Spark读取mysql数据库实例的主要内容,如果未能解决你的问题,请参考以下文章
solr分布式索引实战分片配置读取:工具类configUtil.java,读取配置代码片段,配置实例
spark关于join后有重复列的问题(org.apache.spark.sql.AnalysisException: Reference '*' is ambiguous)(代码片段