spark mysql读写

Posted 牵牛花

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark mysql读写相关的知识,希望对你有一定的参考价值。

val data2mysql2 = (iterator: Iterator[(String, Int)]) => {
    var conn: Connection = null;
    var ps: PreparedStatement = null
    val sql = "Insert into location_info(location,counts,accesse_date) values(?,?,?)"

    try {
      conn = DriverManager.getConnection("jdbc://localhist:3306/bigdata","root","root")
      //整个分区的数据用了一个conn
      iterator.foreach(line =>{
        ps = conn.prepareStatement(sql)
        ps.setString(1,line._1)
        ps.setInt(2,line._2)
        ps.setDate(3,new Date(System.currentTimeMillis()))
        ps.executeUpdate()
      })

    } catch {
      case e: Exception => println("Mysql Exception")
    } finally {
      if (ps != null) ps.close()
      if (conn != null) conn.close()
    }
rddres2.foreachPartition(data2MySQL)
  def mysql2Spark(){
    val conf = new SparkConf().setAppName("JdbcRDDDemo").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val connection = () => {
      Class.forName("com.mysql.jdbc.Driver").newInstance()
      DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata", "root", "root")
    }
    val jdbcRDD = new JdbcRDD(
      sc,
      connection,
      //location_info(location,counts
      "SELECT id, location FROM location_info where id >= ? AND id <= ?",
      1, 4, 2,
      r => {
        val id = r.getInt(1)
        val code = r.getString(2)
        (id, code)
      }
    )
    val jrdd = jdbcRDD.collect()
    println(jdbcRDD.collect().toBuffer)
    sc.stop()
  }

 

以上是关于spark mysql读写的主要内容,如果未能解决你的问题,请参考以下文章

spark mysql读写

Spark学习笔记——读写HDFS

pyspark对Mysql数据库进行读写

在这个 spark 代码片段中 ordering.by 是啥意思?

SparkSQL 读写 Mysql

python+spark程序代码片段