Spark训练营-- SparkStreaming流计算组件wordCount实战
Posted DLab数据实验室
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark训练营-- SparkStreaming流计算组件wordCount实战相关的知识,希望对你有一定的参考价值。
、
导读:上一期介绍了Spark Core的方式读取本地文件进行wordCount的实例,spark Core的方式本质上属于批处理,举个简单的例子,一款App通过埋点的方式采集了用户点击各个功能的日志数据,通过跑批来统计一段时间内点击总量为topK(K>0) 的功能,就会用到这种方式来计算。
但是,如果想统计前一个小时,前半个小时,前10分钟甚至更细粒度的时间段内,该app点击量为topK的功能怎么半,难道要每隔一段时间启动一个新批任务来计算么,显然不是。这种处理模式就是流处理。
SparkStreaming是目前应用比较广泛的流处理框架(组件),它可以通过定义一个时间间隔来完成每隔多久进行一次计算。从以上的描述中,你可能也意识到了SparkStraming本质上还是属于批处理,但是当间隔时间越来越短,也就变成了微批,我们就可以粗略的看做为流处理啦。目前SparkStreaming可以做到秒级的时间间隔,已经能够满足大部分的流处理场景应用了。
SparkStreaming一般结合Kafka等消息队列相互协作,我们本期学习重点在于SparkStreaming,因此用netcat来模拟socket数据源进行SparkStreaming调试。
1.安装并启动netcat
netcat工具非常轻便容易安装,是一款非常好用的数据传输小工具。安装完成后运行下面命令开启服务端口:
nc -l 8888
2.创建SparkStreaming示例
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* @title: WordCountSparkStreaming
* @projectName spark-test
* @description: 使用 netcat 工具向 8888 端口不断的发送数据,通过 SparkStreaming 读取端口数据并统计不同单词出现的次数。
* @author chengyijian
* @date 2020/8/2715:03
*/
object WordCountSparkStreaming {
def main(args: Array[String]): Unit = {
// 1、初始化 Spark 配置信息
val sparkConf: SparkConf = new SparkConf()
.setMaster("local[*]")
.set("spark.driver.host", "localhost")
.setAppName("sparkStreamWordCount")
// 2、初始化 SparkStreamingContext
val ssc: StreamingContext = new StreamingContext(sparkConf, Seconds(5))
val wordAndCountStreams = ssc.socketTextStream("localhost", 8888)
.flatMap(_.split(" "))
.map((_,1))
.reduceByKey(_+_)
wordAndCountStreams.print()
ssc.start()
ssc.awaitTermination()
}
}
3.运行
3.1 在netcat启动的页面输入字符模拟数据产生过程
3.2 可以看到SparkStreaming每隔5s钟进行了实时计算
4.以上就是流处理等一般过程,也是最基本的流处理
本次的实例代码也已同步至Gitee,欢迎下载调试。
https://gitee.com/doubledue/sparktest
●
●
●
●
●
文章都看完了不点个 吗
欢迎 点赞、在看、分享 三连哦~~
以上是关于Spark训练营-- SparkStreaming流计算组件wordCount实战的主要内容,如果未能解决你的问题,请参考以下文章