scala实战之spark读取mysql数据表并存放到mysql库中编程实例

Posted zfszhangyuan

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了scala实战之spark读取mysql数据表并存放到mysql库中编程实例相关的知识,希望对你有一定的参考价值。

今天简单讲解一下应用spark1.5.2相关读取mysql数据到DataFrame的接口以及将DF数据存放到mysql中接口实现实例

同样我们的编程开发环境是不需要安装spark的,但是需要一台安装了mysql的服务器,我这里直接在本机安装了一个mysql,还有就是scala的编程环境。

注意本次使用的spark版本是1.5.2,相关引用的包请参考下图:

先看代码吧

package JDBC_MySql

import java.util.Properties

import org.apache.spark.sql.SaveMode
import org.apache.spark.SparkConf, SparkContext

/**
  * Created by zhoubh on 2016/7/20.
  */
object mysqlDB 

  case class zbh_test(day_id:String, prvnce_id:String,pv_cnts:Int)

  def main(args: Array[String]) 


    val conf = new SparkConf().setAppName("mysql").setMaster("local[4]")
    val sc = new SparkContext(conf)
    //sc.addJar("D:\\\\workspace\\\\sparkApp\\\\lib\\\\mysql-connector-java-5.0.8-bin.jar")
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)



     //定义mysql信息
    val jdbcDF = sqlContext.read.format("jdbc").options(
      Map("url"->"jdbc:mysql://localhost:3306/db_ldjs",
    "dbtable"->"(select imei,region,city,company,name from tb_user_imei) as some_alias",
    "driver"->"com.mysql.jdbc.Driver",
    "user"-> "root",
    //"partitionColumn"->"day_id",
    "lowerBound"->"0",
    "upperBound"-> "1000",
    //"numPartitions"->"2",
    "fetchSize"->"100",
    "password"->"123456")).load()


    jdbcDF.collect().take(20).foreach(println) //终端打印DF中的数据。
    //jdbcDF.rdd.saveAsTextFile("C:/Users/zhoubh/Downloads/abi_sum")
    val url="jdbc:mysql://localhost:3306/db_ldjs"
    val prop=new Properties()
    prop.setProperty("user","root")
    prop.setProperty("password","123456")
    jdbcDF.write.mode(SaveMode.Overwrite).jdbc(url,"zfs_test",prop) //写入数据库db_ldjs的表 zfs_test 中
    //jdbcDF.write.mode(SaveMode.Append).jdbc(url,"zbh_test",prop)  //你会发现SaveMode改成Append依然无济于事,表依然会被重建,为了解决这个问题,后期会另开博客讲解

     //org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils.saveTable(jdbcDF,url,"zbh_test",prop)
    #然后进行groupby 操作,获取数据集合
//    val abi_sum_area = abi_sum.groupBy("date_time", "area_name")
//
    #计算数目,并根据数目进行降序排序
//    val sorted = abi_sum_area.count().orderBy("count")
//
    #显示前10条
//    sorted.show(10)
//
    #存储到文件(这里会有很多分片文件。。。)
//    sorted.rdd.saveAsTextFile("C:/Users/zhoubh/Downloads/sparktest/flight_top")
//
//
    #存储到mysql表里
//    //sorted.write.jdbc(url,"table_name",prop)

  

下面来看看运行结果啥样:


数据库结果如下:

通过这段代码可以实现从mysql关系型数据库中直接读取数据转化成DataFrame参与到sparksql的分析当中这个意义是非常重大的,因为我们日常应用sparksql进行数据分析时经常会用到一些配置表,而这些配置定义表都是存在关系型数据库中,所以以后不用担心了。

另外这里还实现了DataFrame结果回写到mysql数据库中,虽然官方的spark源码的写入有些奇葩,设定的写死模式overwriter,也就是说你确定写入的表,他会重新创建,然后导入数据,这个用起来很不爽,后面博客将讲解如何改写源码,我要怎么写入就怎么写入。(这个意义也很重大,以后分析的结果就可以直接放mysql中,直接对外提供报表,哇 赞


最后感觉华哥的代码和讲解(一个个默默耕耘大数据多年的人)


以上是关于scala实战之spark读取mysql数据表并存放到mysql库中编程实例的主要内容,如果未能解决你的问题,请参考以下文章

日志分析实战之清洗日志小实例1:使用spark&Scala分析Apache日志

scala实战之spark用户在线时长和登录次数统计实例

大数据实战之spark安装部署

scala实战之spark用户在线时长和登录次数统计实例

scala实战之SparkSQL应用实例(单表count和groupby多来源表join等)

HikariCP 源码分析之 leakDetectionThreshold 及实战解决 Spark/Scala 连接池泄漏