SparkStreaming 写入数据到mysql

Posted 欢fion

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SparkStreaming 写入数据到mysql相关的知识,希望对你有一定的参考价值。

使用idea 编码
package streaming
 
import java.sql.DriverManager
 
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
 
object SvaeTomysql {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("savetomysql")
    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(5))
 
    val lines = ssc.socketTextStream("master", 9999)
    val wordcounts = lines.flatMap(x => x.split(" ")).map((_, 1)).reduceByKey(_ + _)
 
    wordcounts.foreachRDD(rdd => rdd.foreachPartition(line => {
      Class.forName("com.mysql.jdbc.Driver")
      //获取mysql连接
      val conn = DriverManager.getConnection("jdbc:mysql://master:3306/test", "root", "123456")
      //把数据写入mysql
      try {
        for (row <- line) {
          val sql = "insert into wordcount(titleName,count)values(‘" + row._1 + "‘,‘" + row._2 + "‘)"
          conn.prepareStatement(sql).executeUpdate()
        }
      } finally {
        conn.close()
      }
    }))
 
    ssc.start()
    ssc.awaitTermination()
 
  }
}

 

 
 
在这之前先创建数据库;
create database test;
create table if not exists wordcount (titleName varchar(100) not null,count int not null);
mvn assembly:assembly 打包上传到虚拟机上savetomysql.sh,执行sh savetomysql.sh
新开一个对话窗,nc -lk 9999
输入单词:
技术图片
然后进入对应 的数据库查看数据:
技术图片

 

以上是关于SparkStreaming 写入数据到mysql的主要内容,如果未能解决你的问题,请参考以下文章

python爬虫等获取实时数据+Flume+Kafka+Spark Streaming+mysql+Echarts实现数据动态实时采集分析展示

SparkStreaming读取kafka生产的数据,进行累计词频统计后将最新结果存入MySQL数据库

python爬虫等获取实时数据+Flume+Kafka+Spark Streaming+mysql+Echarts实现数据动态实时采集分析展示

将 Spark Streaming 输出写入 HDFS 时跳过数据

spark streaming怎么将计算结果写入oracle

SparkStreaming整合kafka