spark mysql读写
Posted 牵牛花
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark mysql读写相关的知识,希望对你有一定的参考价值。
val data2mysql2 = (iterator: Iterator[(String, Int)]) => { var conn: Connection = null; var ps: PreparedStatement = null val sql = "Insert into location_info(location,counts,accesse_date) values(?,?,?)" try { conn = DriverManager.getConnection("jdbc://localhist:3306/bigdata","root","root") //整个分区的数据用了一个conn iterator.foreach(line =>{ ps = conn.prepareStatement(sql) ps.setString(1,line._1) ps.setInt(2,line._2) ps.setDate(3,new Date(System.currentTimeMillis())) ps.executeUpdate() }) } catch { case e: Exception => println("Mysql Exception") } finally { if (ps != null) ps.close() if (conn != null) conn.close() }
rddres2.foreachPartition(data2MySQL)
def mysql2Spark(){ val conf = new SparkConf().setAppName("JdbcRDDDemo").setMaster("local[2]") val sc = new SparkContext(conf) val connection = () => { Class.forName("com.mysql.jdbc.Driver").newInstance() DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata", "root", "root") } val jdbcRDD = new JdbcRDD( sc, connection, //location_info(location,counts "SELECT id, location FROM location_info where id >= ? AND id <= ?", 1, 4, 2, r => { val id = r.getInt(1) val code = r.getString(2) (id, code) } ) val jrdd = jdbcRDD.collect() println(jdbcRDD.collect().toBuffer) sc.stop() }
以上是关于spark mysql读写的主要内容,如果未能解决你的问题,请参考以下文章