scala实战之spark读取mysql数据表并存放到mysql库中编程实例
Posted zfszhangyuan
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了scala实战之spark读取mysql数据表并存放到mysql库中编程实例相关的知识,希望对你有一定的参考价值。
今天简单讲解一下应用spark1.5.2相关读取mysql数据到DataFrame的接口以及将DF数据存放到mysql中接口实现实例
同样我们的编程开发环境是不需要安装spark的,但是需要一台安装了mysql的服务器,我这里直接在本机安装了一个mysql,还有就是scala的编程环境。
注意本次使用的spark版本是1.5.2,相关引用的包请参考下图:
先看代码吧
package JDBC_MySql
import java.util.Properties
import org.apache.spark.sql.SaveMode
import org.apache.spark.SparkConf, SparkContext
/**
* Created by zhoubh on 2016/7/20.
*/
object mysqlDB
case class zbh_test(day_id:String, prvnce_id:String,pv_cnts:Int)
def main(args: Array[String])
val conf = new SparkConf().setAppName("mysql").setMaster("local[4]")
val sc = new SparkContext(conf)
//sc.addJar("D:\\\\workspace\\\\sparkApp\\\\lib\\\\mysql-connector-java-5.0.8-bin.jar")
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
//定义mysql信息
val jdbcDF = sqlContext.read.format("jdbc").options(
Map("url"->"jdbc:mysql://localhost:3306/db_ldjs",
"dbtable"->"(select imei,region,city,company,name from tb_user_imei) as some_alias",
"driver"->"com.mysql.jdbc.Driver",
"user"-> "root",
//"partitionColumn"->"day_id",
"lowerBound"->"0",
"upperBound"-> "1000",
//"numPartitions"->"2",
"fetchSize"->"100",
"password"->"123456")).load()
jdbcDF.collect().take(20).foreach(println) //终端打印DF中的数据。
//jdbcDF.rdd.saveAsTextFile("C:/Users/zhoubh/Downloads/abi_sum")
val url="jdbc:mysql://localhost:3306/db_ldjs"
val prop=new Properties()
prop.setProperty("user","root")
prop.setProperty("password","123456")
jdbcDF.write.mode(SaveMode.Overwrite).jdbc(url,"zfs_test",prop) //写入数据库db_ldjs的表 zfs_test 中
//jdbcDF.write.mode(SaveMode.Append).jdbc(url,"zbh_test",prop) //你会发现SaveMode改成Append依然无济于事,表依然会被重建,为了解决这个问题,后期会另开博客讲解
//org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils.saveTable(jdbcDF,url,"zbh_test",prop)
#然后进行groupby 操作,获取数据集合
// val abi_sum_area = abi_sum.groupBy("date_time", "area_name")
//
#计算数目,并根据数目进行降序排序
// val sorted = abi_sum_area.count().orderBy("count")
//
#显示前10条
// sorted.show(10)
//
#存储到文件(这里会有很多分片文件。。。)
// sorted.rdd.saveAsTextFile("C:/Users/zhoubh/Downloads/sparktest/flight_top")
//
//
#存储到mysql表里
// //sorted.write.jdbc(url,"table_name",prop)
下面来看看运行结果啥样:
数据库结果如下:
通过这段代码可以实现从mysql关系型数据库中直接读取数据转化成DataFrame参与到sparksql的分析当中这个意义是非常重大的,因为我们日常应用sparksql进行数据分析时经常会用到一些配置表,而这些配置定义表都是存在关系型数据库中,所以以后不用担心了。
另外这里还实现了DataFrame结果回写到mysql数据库中,虽然官方的spark源码的写入有些奇葩,设定的写死模式overwriter,也就是说你确定写入的表,他会重新创建,然后导入数据,这个用起来很不爽,后面博客将讲解如何改写源码,我要怎么写入就怎么写入。(这个意义也很重大,以后分析的结果就可以直接放mysql中,直接对外提供报表,哇 赞
最后感觉华哥的代码和讲解(一个个默默耕耘大数据多年的人)
以上是关于scala实战之spark读取mysql数据表并存放到mysql库中编程实例的主要内容,如果未能解决你的问题,请参考以下文章
日志分析实战之清洗日志小实例1:使用spark&Scala分析Apache日志
scala实战之SparkSQL应用实例(单表count和groupby多来源表join等)
HikariCP 源码分析之 leakDetectionThreshold 及实战解决 Spark/Scala 连接池泄漏