SparkStreaming 写入数据到mysql
Posted 欢fion
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SparkStreaming 写入数据到mysql相关的知识,希望对你有一定的参考价值。
使用idea 编码
package streaming import java.sql.DriverManager import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} object SvaeTomysql { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("savetomysql") val sc = new SparkContext(sparkConf) val ssc = new StreamingContext(sc, Seconds(5)) val lines = ssc.socketTextStream("master", 9999) val wordcounts = lines.flatMap(x => x.split(" ")).map((_, 1)).reduceByKey(_ + _) wordcounts.foreachRDD(rdd => rdd.foreachPartition(line => { Class.forName("com.mysql.jdbc.Driver") //获取mysql连接 val conn = DriverManager.getConnection("jdbc:mysql://master:3306/test", "root", "123456") //把数据写入mysql try { for (row <- line) { val sql = "insert into wordcount(titleName,count)values(‘" + row._1 + "‘,‘" + row._2 + "‘)" conn.prepareStatement(sql).executeUpdate() } } finally { conn.close() } })) ssc.start() ssc.awaitTermination() } }
在这之前先创建数据库;
create database test;
create table if not exists wordcount (titleName varchar(100) not null,count int not null);
mvn assembly:assembly 打包上传到虚拟机上savetomysql.sh,执行sh savetomysql.sh
新开一个对话窗,nc -lk 9999
输入单词:
然后进入对应 的数据库查看数据:
以上是关于SparkStreaming 写入数据到mysql的主要内容,如果未能解决你的问题,请参考以下文章
python爬虫等获取实时数据+Flume+Kafka+Spark Streaming+mysql+Echarts实现数据动态实时采集分析展示
SparkStreaming读取kafka生产的数据,进行累计词频统计后将最新结果存入MySQL数据库
python爬虫等获取实时数据+Flume+Kafka+Spark Streaming+mysql+Echarts实现数据动态实时采集分析展示
将 Spark Streaming 输出写入 HDFS 时跳过数据