spark写入mysql

Posted kwz

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark写入mysql相关的知识,希望对你有一定的参考价值。

第一种方式:

private def singleDataSavemysql(sql: String) = {
    val dataFrame2: DataFrame = ss.sql(sql)
    val resultRDD = df2rdd(dataFrame2)
    val value: RDD[Map[String, Map[String, Map[String, String]]]] = resultRDD.map(diagLis => {
      var diagLisMap: Map[String, Map[String, Map[String, String]]] = Map()

      val diag: String = diagLis._1
      val lisText: String = diagLis._2
      if (!diagLisMap.contains(diag)) {
        //用空行分割字符串
        val itemResults: Array[String] = lisText.split("(
|
)\s+")
        var itemSpecial: Map[String, Map[String, String]] = Map()

        for (i <- 0 until (itemResults.length)) {
          val split1: Array[String] = itemResults(i).split("
")
          var item: String = ""
          var special: Map[String, String] = Map()
          if (split1.length > 1) for (j <- 0 until split1.length) {
            if (j == 0) {
              item = split1(j).replaceAll("【下沙】", "")
            } else {
              val splits: Array[String] = split1(j).split("	")
              if (splits.length > 2) {
                val spell: String = splits(0).split(":")(0)
                val betw: String = splits(1)
                special += (spell -> betw)
              }
            }
            itemSpecial += (item -> special)
          }
        }
        diagLisMap += (diag -> itemSpecial)
      }
      diagLisMap
    })
    val driver = "com.mysql.jdbc.Driver"
    val url = "jdbc:mysql://192.168.21.2351:3306/diagbot-app?useSSL=false"
    val username = "root"
    val password = "diagbot@20180822kwz"
    Class.forName(driver)

    //遍历测试
    value.foreachPartition(l => {

      @transient var connectionMqcrm = DriverManager.getConnection(url, username, password)
      l.foreach(m => {
        for (k <- m) {
          val diag: String = k._1
          val iteamSpecal: Map[String, Map[String, String]] = k._2
          for (l <- iteamSpecal) {
            val iteam: String = l._1.toString().replace(":", "") //大项
            val specails: Map[String, String] = l._2
            for (spe <- specails) {
              val it: String = spe._1
              val bet: String = spe._2
              println(diag + "	" + iteam + "	" + it + "	" + bet)
              val sql = "insert into doc_diag_lises(diag,iteam,it,bet) values (‘" + diag + "‘,‘" + iteam + "‘,‘" + it + "‘,‘" + bet + "‘)"
              val statement: Statement = connectionMqcrm.createStatement()
              statement.executeUpdate(sql)
            }
          }
        }
      })
      connectionMqcrm.close()
    })
  }

private def df2rdd(dataFrame2: DataFrame) = {
val rowRdd: RDD[Row] = dataFrame2.rdd
val resultRDD: RDD[(String, String)] = rowRdd.map(row => {
val diag: String = row.get(0).toString
val liss: String = row.get(1).toString
(diag, liss)
})
resultRDD
}
 

第二种方式:

//将结果RDD映射到rowRDD
val resultRowRDD = arrayRDD.map(p =>Row(
  p._1.toInt,
  p._2.toString,
  new Timestamp(new java.util.Date().getTime)
))
//通过StructType直接指定每个字段的schema
val resultSchema = StructType(
  List(
    StructField("verify_num", IntegerType, true), 
    StructField("log_date", StringType, true), //是哪一天日志分析出来的结果
    StructField("create_time", TimestampType, true) //分析结果的创建时间
  )
)
//组装新的DataFrame
val DF = spark.createDataFrame(resultRowRDD,resultSchema)
//将结果写入到Mysql
DF.write.mode("append")
  .format("jdbc")
  .option("url","jdbc:mysql://192.168.1.97:3306/xiang_log")
  .option("dbtable","verify") //表名
  .option("user","root")
  .option("password","123456")
  .save()

第三种方式:

case class ManxingweiyanLis(diseaseName: String,cardId: String, lisName: String,lisResult:String,lisAndResult:String)
object jangganHive {
  val sparkConf: SparkConf = new SparkConf().setAppName(jangganHive.getClass.getSimpleName)
  val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
  val url = "jdbc:mysql://192.168.2.232:3306/jianggan?Unicode=true&characterEncoding=utf8&autoReconnect=true&failOverReadOnly=false";
  def main(args: Array[String]): Unit = {
    assc
    sparkSession.stop()
  }

  def assc: Unit = {
    import sparkSession.implicits._
    import sparkSession.sql
    val df: DataFrame = sql("select cardId,lisName,lisresult,lisbet from janggan.gaozhixuelis where lisbet !="" and lisName !="清洁度"")
    val rdd: RDD[Row] = df.rdd
    //计算化验结果
    val operatorLis: RDD[(String, String)] = rdd.map(row => {
      var i = ""
      val cardID: String = row.get(0).toString
      val lisName: String = row.get(1).toString
      try {
        val lisResult: String = row.get(2).toString
        val lisBet: String = row.get(3).toString
        if (lisResult.contains("+")) {
          (cardID + "&" + lisName, "阳性")
        } else if(lisResult.contains("阴性") || lisResult.contains("-")){
          (cardID + "&" + lisName, "阴性")
        }else {
          val splits: Array[String] = lisBet.split("-|-")
          if (lisResult.toDouble > splits(1).toDouble) {
            i = "升高"
          } else if (lisResult.toDouble < splits(0).toDouble) {
            i = "降低"
          }else{
            i="正常"
          }
          (cardID + "&" + lisName, i)
        }
      } catch {
        case e: Exception => {
          (cardID + "&" + lisName, "数据异常")
        }
      }
    })

    val frame: DataFrame = operatorLis.map(x => {
      ManxingweiyanLis("高脂血症",x._1.split("&")(0), x._1.split("&")(1), x._2,x._1.split("&")(1)+x._2)
    }).toDF()
    val proprttity=new Properties()
    proprttity.put("user", "root")
    proprttity.put("password", "123456")
    proprttity.put("driver", "com.mysql.jdbc.Driver")
    frame.write.mode(SaveMode.Append).jdbc(url, "exceptionLis", proprttity)
  }
}

 

以上是关于spark写入mysql的主要内容,如果未能解决你的问题,请参考以下文章

为啥从 Spark 写入 Vertica DB 比从 Spark 写入 MySQL 需要更长的时间?

spark将数据写入mysql的共享表

Spark:将DataFrame写入Mysql

Spark SQL大数据处理并写入Elasticsearch

Spark操作dataFrame进行写入mysql,自定义sql的方式

Spark 写入Mysql