spark写入mysql
Posted kwz
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark写入mysql相关的知识,希望对你有一定的参考价值。
第一种方式:
private def singleDataSavemysql(sql: String) = { val dataFrame2: DataFrame = ss.sql(sql) val resultRDD = df2rdd(dataFrame2) val value: RDD[Map[String, Map[String, Map[String, String]]]] = resultRDD.map(diagLis => { var diagLisMap: Map[String, Map[String, Map[String, String]]] = Map() val diag: String = diagLis._1 val lisText: String = diagLis._2 if (!diagLisMap.contains(diag)) { //用空行分割字符串 val itemResults: Array[String] = lisText.split("( | )\s+") var itemSpecial: Map[String, Map[String, String]] = Map() for (i <- 0 until (itemResults.length)) { val split1: Array[String] = itemResults(i).split(" ") var item: String = "" var special: Map[String, String] = Map() if (split1.length > 1) for (j <- 0 until split1.length) { if (j == 0) { item = split1(j).replaceAll("【下沙】", "") } else { val splits: Array[String] = split1(j).split(" ") if (splits.length > 2) { val spell: String = splits(0).split(":")(0) val betw: String = splits(1) special += (spell -> betw) } } itemSpecial += (item -> special) } } diagLisMap += (diag -> itemSpecial) } diagLisMap }) val driver = "com.mysql.jdbc.Driver" val url = "jdbc:mysql://192.168.21.2351:3306/diagbot-app?useSSL=false" val username = "root" val password = "diagbot@20180822kwz" Class.forName(driver) //遍历测试 value.foreachPartition(l => { @transient var connectionMqcrm = DriverManager.getConnection(url, username, password) l.foreach(m => { for (k <- m) { val diag: String = k._1 val iteamSpecal: Map[String, Map[String, String]] = k._2 for (l <- iteamSpecal) { val iteam: String = l._1.toString().replace(":", "") //大项 val specails: Map[String, String] = l._2 for (spe <- specails) { val it: String = spe._1 val bet: String = spe._2 println(diag + " " + iteam + " " + it + " " + bet) val sql = "insert into doc_diag_lises(diag,iteam,it,bet) values (‘" + diag + "‘,‘" + iteam + "‘,‘" + it + "‘,‘" + bet + "‘)" val statement: Statement = connectionMqcrm.createStatement() statement.executeUpdate(sql) } } } }) connectionMqcrm.close() }) }
private def df2rdd(dataFrame2: DataFrame) = {
val rowRdd: RDD[Row] = dataFrame2.rdd
val resultRDD: RDD[(String, String)] = rowRdd.map(row => {
val diag: String = row.get(0).toString
val liss: String = row.get(1).toString
(diag, liss)
})
resultRDD
}
第二种方式:
//将结果RDD映射到rowRDD val resultRowRDD = arrayRDD.map(p =>Row( p._1.toInt, p._2.toString, new Timestamp(new java.util.Date().getTime) )) //通过StructType直接指定每个字段的schema val resultSchema = StructType( List( StructField("verify_num", IntegerType, true), StructField("log_date", StringType, true), //是哪一天日志分析出来的结果 StructField("create_time", TimestampType, true) //分析结果的创建时间 ) ) //组装新的DataFrame val DF = spark.createDataFrame(resultRowRDD,resultSchema) //将结果写入到Mysql DF.write.mode("append") .format("jdbc") .option("url","jdbc:mysql://192.168.1.97:3306/xiang_log") .option("dbtable","verify") //表名 .option("user","root") .option("password","123456") .save()
第三种方式:
case class ManxingweiyanLis(diseaseName: String,cardId: String, lisName: String,lisResult:String,lisAndResult:String) object jangganHive { val sparkConf: SparkConf = new SparkConf().setAppName(jangganHive.getClass.getSimpleName) val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate() val url = "jdbc:mysql://192.168.2.232:3306/jianggan?Unicode=true&characterEncoding=utf8&autoReconnect=true&failOverReadOnly=false"; def main(args: Array[String]): Unit = { assc sparkSession.stop() } def assc: Unit = { import sparkSession.implicits._ import sparkSession.sql val df: DataFrame = sql("select cardId,lisName,lisresult,lisbet from janggan.gaozhixuelis where lisbet !="" and lisName !="清洁度"") val rdd: RDD[Row] = df.rdd //计算化验结果 val operatorLis: RDD[(String, String)] = rdd.map(row => { var i = "" val cardID: String = row.get(0).toString val lisName: String = row.get(1).toString try { val lisResult: String = row.get(2).toString val lisBet: String = row.get(3).toString if (lisResult.contains("+")) { (cardID + "&" + lisName, "阳性") } else if(lisResult.contains("阴性") || lisResult.contains("-")){ (cardID + "&" + lisName, "阴性") }else { val splits: Array[String] = lisBet.split("-|-") if (lisResult.toDouble > splits(1).toDouble) { i = "升高" } else if (lisResult.toDouble < splits(0).toDouble) { i = "降低" }else{ i="正常" } (cardID + "&" + lisName, i) } } catch { case e: Exception => { (cardID + "&" + lisName, "数据异常") } } }) val frame: DataFrame = operatorLis.map(x => { ManxingweiyanLis("高脂血症",x._1.split("&")(0), x._1.split("&")(1), x._2,x._1.split("&")(1)+x._2) }).toDF() val proprttity=new Properties() proprttity.put("user", "root") proprttity.put("password", "123456") proprttity.put("driver", "com.mysql.jdbc.Driver") frame.write.mode(SaveMode.Append).jdbc(url, "exceptionLis", proprttity) } }
以上是关于spark写入mysql的主要内容,如果未能解决你的问题,请参考以下文章
为啥从 Spark 写入 Vertica DB 比从 Spark 写入 MySQL 需要更长的时间?
Spark SQL大数据处理并写入Elasticsearch