Spark读取mysql数据库实例

Posted 向前展望,倒后推理

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark读取mysql数据库实例相关的知识,希望对你有一定的参考价值。

1.Spark可以读取多种数据源,本实例为读取mysql.

2.准备工作:

sacla,idea,mysql-connector-java,版本从:https://mvnrepository.com/获取

3.数据获取方式一:

object WordFreq {
  def main(args: Array[String]) {

    val spark: SparkSession = SparkSession.builder().master("local").appName("getDatafromMysql")
      .config("spark.sql.shuffle.partitions", 1).getOrCreate()

    val properties: Properties = new Properties()
    properties.setProperty("user", "root")
    properties.setProperty("password", "root")
    properties.setProperty("driver", "com.mysql.jdbc.Driver")

    //方式一
    val person: DataFrame = spark.read.jdbc("jdbc:mysql://localhost:3306/acc", "ttt", properties)
    person.show()
    //方式二
    spark.read.jdbc("jdbc:mysql://localhost:3306/acc", "(select * from ut_tt) T", properties).show()
  }
}

数据获取方式二:

        val spark: SparkSession = SparkSession.builder().master("local").appName("getDatafromMysql")
          .config("spark.sql.shuffle.partitions", 1).getOrCreate()
        val map: Map[String, String] = Map[String, String](
          elems = "url" -> "jdbc:mysql://localhost:3306/yyyy",
          "driver" -> "com.mysql.jdbc.Driver",
          "user" -> "root",
          "password" -> "root",
          "dbtable" -> "notice")
        val score: DataFrame = spark.read.format("jdbc").options(map).load
        score.show()

数据获取方式三:

      val spark: SparkSession = SparkSession.builder().master("local").appName("getDatafromMysql")
          .config("spark.sql.shuffle.partitions", 1).getOrCreate()
        val reader: DataFrameReader = spark.read.format("jdbc")
          .option("url", "jdbc:mysql://localhost:3306/yyyyyy")
          .option("driver", "com.mysql.jdbc.Driver")
          .option("user", "root")
          .option("password", "root")
          .option("dbtable", "notice")

        val source2: DataFrame = reader.load()

        source2.show()

将数据插入mysql数据库

 //将查询结果插入mysql表

    val spark: SparkSession = SparkSession.builder().master("local").appName("getDatafromMysql")
      .config("spark.sql.shuffle.partitions", 1).getOrCreate()

    val result  = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/ttttttt")  //*****这是数据库名
      .option("driver", "com.mysql.jdbc.Driver").option("dbtable", "notice")//*****是表名
      .option("user", "root").option("password", "root").load()


    val properties: Properties = new Properties()
    properties.setProperty("user", "root")
    properties.setProperty("password", "root")
    properties.setProperty("driver", "com.mysql.jdbc.Driver")
    properties.setProperty("characterEncoding","utf8")
    result.write.mode(SaveMode.Append).jdbc("jdbc:mysql://localhost:3306/iii", "notice_copy1", properties)

所需引用:

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SparkSession
import java.util.Properties

在pom文件中,添加mysql-connector-java引用:

 <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>*****</version>
    </dependency>

4.运行结果:

 

以上是关于Spark读取mysql数据库实例的主要内容,如果未能解决你的问题,请参考以下文章

Spark实例-spark读取外部配置文件之--files

solr分布式索引实战分片配置读取:工具类configUtil.java,读取配置代码片段,配置实例

spark关于join后有重复列的问题(org.apache.spark.sql.AnalysisException: Reference '*' is ambiguous)(代码片段

Spark读取Mysql,Redis,Hbase数据

Spark读取Mysql,Redis,Hbase数据

Spark(StructuredStreaming)从Mysql读取数据写入Mysql表(自增ID)