Spark Streaming快速入门之WordCount

Posted Mr.zhou_Zxy

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark Streaming快速入门之WordCount相关的知识,希望对你有一定的参考价值。

Spark Streaming快速入门之WordCount

Spark Streaming快速入门官网

Consumer

package com.zxy.spark.Streaming

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}

object wordCount {
    def main(args: Array[String]): Unit = {
        // 创建配置文件对象,一般设计local[*],即有多少用多少
        val conf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
    
        // 创建Spark Streaming对象
        val scc = new StreamingContext(conf, Seconds(3))
    
        // 从端口中获取数据源,这个9999端口就是与Linux端的Producer数据传输的端口
        val socketDS: ReceiverInputDStream[String] = scc.socketTextStream("192.168.130.110", 9999)
        
    
        val reduceDS: DStream[(String, Int)] = socketDS.flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _)
    
        /**
         * 对上步操作细化
         * 
         * // 对获取到的数据进行扁平化处理,即把输入的数据以空格为切割方式
         * val flatMapDS: DStream[String] = socketDS.flatMap(_.split(" "))
         *
         * // 对数据进行结构上的转换
         * val mapDS: DStream[(String, Int)] = flatMapDS.map((_, 1))
         *
         * // 对上述的数据进行聚合处理
         * val reduceDS: DStream[(String, Int)] = mapDS.reduceByKey(_ + _)
         */
        // 输出结果      注意:调用的是 DS的 print 函数
        reduceDS.print()
    
        // 启动采集器
        scc.start()
    
        // 默认情况下,上下文对象不能关闭
        // scc.stop()
    
        // 等待采集器结束,终止上下文环境对象
        scc.awaitTermination()
        
    }
}

POM

		<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>1.1.1</version>
        </dependency>

tips:

provided
如果在依赖中加入了这一行,会出现这种问题
NoClassDefFoundError: org/apache/spark/streaming/StreamingContext
这时候只需要删除这一行就可以

Producer

端口传输主要有三种方式:
nc nmap telnet

[root@hadoop ~]# yum install -y nc
[root@hadoop ~]# nc -lk 9999
然后就可以在这边发送数据,在Consumer端接收数据,并进行WordCount统计,Consumer端每3S都会统计一次,不管这边有没有发送数据


Time: 1624336719000 ms

(hive,1)
(word,1)
(hello,4)
(java,1)
(spark,1)

在这里插入图片描述

以上是关于Spark Streaming快速入门之WordCount的主要内容,如果未能解决你的问题,请参考以下文章

Spark Streaming入门详解

Note_Spark_Day11:Spark Streaming

Spark StreamingSpark Day11:Spark Streaming 学习笔记

Spark StreamingSpark Day11:Spark Streaming 学习笔记

流式计算助力实时数据处理spark-streaming入门实战

Spark Streaming入门