Spark Streaming快速入门之WordCount
Posted Mr.zhou_Zxy
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark Streaming快速入门之WordCount相关的知识,希望对你有一定的参考价值。
Spark Streaming快速入门之WordCount
Consumer
package com.zxy.spark.Streaming
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
object wordCount {
def main(args: Array[String]): Unit = {
// 创建配置文件对象,一般设计local[*],即有多少用多少
val conf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
// 创建Spark Streaming对象
val scc = new StreamingContext(conf, Seconds(3))
// 从端口中获取数据源,这个9999端口就是与Linux端的Producer数据传输的端口
val socketDS: ReceiverInputDStream[String] = scc.socketTextStream("192.168.130.110", 9999)
val reduceDS: DStream[(String, Int)] = socketDS.flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _)
/**
* 对上步操作细化
*
* // 对获取到的数据进行扁平化处理,即把输入的数据以空格为切割方式
* val flatMapDS: DStream[String] = socketDS.flatMap(_.split(" "))
*
* // 对数据进行结构上的转换
* val mapDS: DStream[(String, Int)] = flatMapDS.map((_, 1))
*
* // 对上述的数据进行聚合处理
* val reduceDS: DStream[(String, Int)] = mapDS.reduceByKey(_ + _)
*/
// 输出结果 注意:调用的是 DS的 print 函数
reduceDS.print()
// 启动采集器
scc.start()
// 默认情况下,上下文对象不能关闭
// scc.stop()
// 等待采集器结束,终止上下文环境对象
scc.awaitTermination()
}
}
POM
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>1.1.1</version>
</dependency>
tips:
provided
如果在依赖中加入了这一行,会出现这种问题
NoClassDefFoundError: org/apache/spark/streaming/StreamingContext
这时候只需要删除这一行就可以
Producer
端口传输主要有三种方式:
nc nmap telnet
[root@hadoop ~]# yum install -y nc
[root@hadoop ~]# nc -lk 9999
然后就可以在这边发送数据,在Consumer端接收数据,并进行WordCount统计,Consumer端每3S都会统计一次,不管这边有没有发送数据
Time: 1624336719000 ms
(hive,1)
(word,1)
(hello,4)
(java,1)
(spark,1)
以上是关于Spark Streaming快速入门之WordCount的主要内容,如果未能解决你的问题,请参考以下文章
Note_Spark_Day11:Spark Streaming
Spark StreamingSpark Day11:Spark Streaming 学习笔记
Spark StreamingSpark Day11:Spark Streaming 学习笔记