071 SparkStreaming与SparkSQL集成

Posted 曹军

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了071 SparkStreaming与SparkSQL集成相关的知识,希望对你有一定的参考价值。

1.说明

  虽然DStream可以转换成RDD,但是如果比较复杂,可以考虑使用SparkSQL。

 

2.集成方式

  Streaming和Core整合:
    transform或者foreachRDD方法
  Core和SQL整合:
    RDD <==> DataFrame 互换

 

3.程序

 1 package com.sql.it
 2 import org.apache.spark.sql.SQLContext
 3 import org.apache.spark.storage.StorageLevel
 4 import org.apache.spark.streaming.kafka.KafkaUtils
 5 import org.apache.spark.streaming.{Seconds, StreamingContext}
 6 import org.apache.spark.{SparkConf, SparkContext}
 7 object StreamingSQL {
 8   def main(args: Array[String]): Unit = {
 9     val conf = new SparkConf()
10       .setAppName("StreamingWindowOfKafka22")
11       .setMaster("local[*]")
12     val sc = SparkContext.getOrCreate(conf)
13     val ssc = new StreamingContext(sc, Seconds(5))
14     // 当调用updateStateByKey函数API的时候,必须给定checkpoint dir
15     // 路径对应的文件夹不能存在
16     ssc.checkpoint("hdfs://linux-hadoop01.ibeifeng.com:8020/beifeng/spark/streaming/9421151351")
17 
18     val kafkaParams = Map(
19       "group.id" -> "streaming-kafka-78912151",
20       "zookeeper.connect" -> "linux-hadoop01.ibeifeng.com:2181/kafka",
21       "auto.offset.reset" -> "smallest"
22     )
23     val topics = Map("beifeng" -> 4) // topics中value是读取数据的线程数量,所以必须大于等于1
24     val dstream = KafkaUtils.createStream[String, String, kafka.serializer.StringDecoder, kafka.serializer.StringDecoder](
25       ssc, // 给定SparkStreaming上下文
26       kafkaParams, // 给定连接kafka的参数信息 ===> 通过Kafka HighLevelConsumerAPI连接
27       topics, // 给定读取对应topic的名称以及读取数据的线程数量
28       StorageLevel.MEMORY_AND_DISK_2 // 指定数据接收器接收到kafka的数据后保存的存储级别
29     ).map(_._2)
30 
31     /**
32       * transform:将DStream的操作转换为RDD的操作,调用该api最终只需要返回一个新的RDD即可
33       */
34     dstream.transform(rdd => {
35       // 使用sql统计wordcoount
36       val sqlContext = SQLContextSingelton.getSQLContext(rdd.sparkContext)
37       import sqlContext.implicits._
38       val procedRDD = rdd.filter(_.nonEmpty).flatMap(_.split(" ").map((_, 1)))
39       procedRDD.toDF("word", "c").registerTempTable("tb_word")
40       val resultRDD = sqlContext.sql("select word, count(c) as vc from tb_word group by word").map(row => {
41         val word = row.getAs[String]("word")
42         val count = row.getAs[Long]("vc")
43         (word, count)
44       })
45 
46       resultRDD
47     }).print()
48 
49     // 启动开始处理
50     ssc.start()
51     ssc.awaitTermination() // 等等结束,监控一个线程的中断操作
52   }
53 }
54 
55 object SQLContextSingelton {
56   @transient private var instance: SQLContext = _
57 
58   def getSQLContext(sc: SparkContext): SQLContext = {
59     if (instance == null) {
60       synchronized[SQLContext] {
61         if (instance == null) {
62           instance = new SQLContext(sc)
63         }
64         instance
65       }
66     }
67     instance
68   }
69 }

 

4.效果

  

 

以上是关于071 SparkStreaming与SparkSQL集成的主要内容,如果未能解决你的问题,请参考以下文章

spark_sql_DataFromMysql_InferringSchema_SparkSqlSchema_SparkSqlToMysql_SparkStreaming_Flume_Poll(示例代

Kafka 遇上 Spark Streaming

Flex:MX + Sparks 与仅 MX 组件

Sparks enableHiveSupport [重复]

071算法与数据结构-综合提升 C++版

吉他弹唱 taylor swift《 sparks fly》!