Spark(StructuredStreaming)从Mysql读取数据写入Mysql表(自增ID)
Posted 一只楠喃
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark(StructuredStreaming)从Mysql读取数据写入Mysql表(自增ID)相关的知识,希望对你有一定的参考价值。
import java.sql.{Connection, DriverManager}
import java.util.Properties
import org.apache.spark.sql.{DataFrame, DataFrameReader, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
class RWSpark {
def main(args: Array[String]): Unit = {
//构建Spark环境
val spark = SparkSession.builder()
.appName(this.getClass.getCanonicalName.stripPrefix("$"))
.master("local[2]")
.config("saprk.sql.shuffle.partitions", "2")
.config("spark.testing.memory", "477000000")
.getOrCreate()
/** 读取mysql数据方法一 */
var url:String="jdbc:mysql://node1:3306/test"
val prop = new Properties()
prop.setProperty("user","root")
prop.setProperty("password","123456")
val inputDF: DataFrame = spark.read
.jdbc(url, "sparkOne", prop)
/** 读取mysql数据方法二 */
val reader: DataFrameReader = spark.read
.format("jdbc").option("url", url)
.option("driver", "com.mysql.jdbc.Driver")
.option("user", "root")
.option("password", "123456")
.option("dbtable", "sparkOne")
//写入Mysql id为自增id时
inputDF.coalesce(1)
.foreachPartition { iter =>
Class.forName("com.mysql.cj.jdbc.Driver")
var con: Connection = null
var psmst=null
con=DriverManager.getConnection(
"jdbc:mysql://node1:3306/test?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true",
"root",
"123456"
)
con.prepareStatement(
"""
|INSERT INTO test.sparkOne(id,name,age)
| VALUES (null,?,?)
""".stripMargin)
}
}
}
以上是关于Spark(StructuredStreaming)从Mysql读取数据写入Mysql表(自增ID)的主要内容,如果未能解决你的问题,请参考以下文章
Spark Structured Streaming - 1
Spark Structured Streaming框架之窗口管理详解