SparkStreaming---SparkSQL
Posted Shall潇
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SparkStreaming---SparkSQL相关的知识,希望对你有一定的参考价值。
如何在SparkStreaming中编写 SparkSQL
【目的:当业务由rdd处理很麻烦的时候,可以使用SparkSQL 来解决】
package window
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
/*
* 在 SparkStreaming 中编写 SparkSQL
* */
object SparkStreamingSQL {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("Windows")
val streamingContext = new StreamingContext(conf,Seconds(2))
streamingContext.checkpoint("checkpoint")
val kafkaParams = Map(
(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "192.168.XXX.100:9092"),
(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG->"org.apache.kafka.common.serialization.StringDeserializer"),
(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG->"org.apache.kafka.common.serialization.StringDeserializer"),
// (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG->"earliest"),
ConsumerConfig.GROUP_ID_CONFIG->"kafkaGroup4"
)
val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe(Set("sparkKafka"), kafkaParams)
)
// TODO --- SparkSQL
val Rstream: DStream[Row] = kafkaStream.transform(rdd => {
val spark: SparkSession = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
val wcrdd: RDD[(String, Int)] = rdd.flatMap(x => x.value().split("\\\\s+").map(y => (y, 1)))
//别忘记导隐式类
import spark.implicits._
val frame: DataFrame = wcrdd.toDF("word", "number")
frame.createOrReplaceTempView("wc")
val res: DataFrame = spark.sql("select word,sum(number) from wc group by word")
res.rdd // 返回值需要rdd,所以再转回去
})
Rstream.print()
streamingContext.start()
streamingContext.awaitTermination()
}
}
object SparkSessionSingleton{
@transient private var instance:SparkSession = _
def getInstance(sparkConf:SparkConf):SparkSession={
if(null == instance){
instance = SparkSession.builder().getOrCreate()
}
instance
}
}
以上是关于SparkStreaming---SparkSQL的主要内容,如果未能解决你的问题,请参考以下文章