Spark Scala 将数据保存到Mysql
Posted 点点滴滴的记录.
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark Scala 将数据保存到Mysql相关的知识,希望对你有一定的参考价值。
1添加pom.xml配置
<build> <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> <configuration> <args> <arg>-dependencyfile</arg> <arg>${project.build.directory}/.scala_dependencies</arg> </args> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass></mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build>
scala代码
package com.ScalcForSpark.Core import java.sql.{DriverManager} import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Tomysql extends App { //创建Spark连接 val sparkConf=new SparkConf().setAppName("ToMysal").setMaster("local[2]") val sc=new SparkContext(sparkConf) //读取本地文件,并且每行数据用逗号拆分 private val rdd: RDD[Array[String]] = sc.textFile("F:\Work\demodata\person.txt").map(_.split(",")) //按照分区进行遍历 rdd.foreachPartition(i=>{ //创建mysql连接 val connection= DriverManager.getConnection("jdbc:mysql://localhost/spark?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC","root","Aa123!@#") val sql="insert into user(id,name,age) values(?,?,?)" val ps = connection.prepareStatement(sql) for(a<-i){ ps.setInt(1,a(0).toInt) ps.setString(2,a(1)) ps.setInt(3,a(2).toInt) ps.addBatch() } ps.executeBatch() ps.close() connection.close() }) }
以上是关于Spark Scala 将数据保存到Mysql的主要内容,如果未能解决你的问题,请参考以下文章
如何从 SocketTCP 获取数据以保存到 Spark Scala 中的数据帧?
scala实战之spark源码修改(能够将DataFrame按字段增量写入mysql数据表)