SparkStreaming

Posted 阿德小仔

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SparkStreaming相关的知识,希望对你有一定的参考价值。

目录

SparkStreaming概述

SparkStreaming是什么

SparkStreaming特点

Spark Streaming架构

Dstream

Dstream入门

DStream 创建

DStream 转换

无状态转化操作

有状态转化操作

 自定义数据源

Kafka 数据源


SparkStreaming概述

画图引出 Spark Streaming 概念:

SparkStreaming是什么

SparkStreaming用于流式数据的处理。SparkStreaming支持的数据输入源有很多,例如:KafKa、Flume和简单的TCP套接字等等。数据输入后可以用Spark的高度抽象原语如:map、 reduce、join、window等进行运算。而结果也能保存在很多地方,如HDFS,数据库等。

和Spark基于RDD的概念很相似,Spark Streaming使用离散化流(discretized stream)作为抽象表 示,叫作DStream。DStream是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为 RDD 存在,而 DStream 是由这些 RDD 所组成的序列(因此得名“离散化”)。所以简单来将,DStream 就是对 RDD 在实时数据处理场景的一种封装。 注意:离散化流可以理解为不连续的流。
// 介绍一下什么是流式什么是批量: //1. 从数据处理的方式角度: // 流式( Streaming )数据处理 // 批量( batch )数据处理 //2. 从数据处理延迟的长短角度 // 实时数据处理:毫秒级别 // 离线数据处理:小时 or 天级别 //SparkStreaming 准实时(秒、分钟),微批次(时间)的数据处理框架。
画图介绍每个时间区间:

SparkStreaming特点

(1)易用 (2)统一的批处理和流式API (3)低延迟和成本效益

Spark Streaming架构

架构图: (1)整体架构图:

(2) SparkStreaming 架构图:

背压机制 Spark 1.5 以前版本,用户如果要限制 Receiver 的数据接收速率,可以通过设置静态配制参数 “spark.streaming.receiver.maxRate”的值来实现,此举虽然可以通过限制接收速率,来适配当前的处理能力,防止内存溢出,但也会引入其它问题。比如:producer 数据生产高于 maxRate,当前集群处理能力也高于 maxRate,这就会造成资源利用率下降等问题。为了更好的协调数据接收速率与资源处理能力,1.5 版本开始 Spark Streaming 可以动态控制数据接收速率来适配集群数据处理能力。背压机制(即 Spark Streaming Backpressure): 根据JobScheduler 反馈作业的执行信息来动态调整 Receiver 数据接收率。通过属性“spark.streaming.backpressure.enabled”来控制是否启用 backpressure 机制,默认值false,即不启用。

Dstream

与SpakrContext类似,StreamingContext是实时应用程序的入口,它充当应用程序与Spark引擎的连接纽带。DStream对于Spark Streaming的作用就如同RDD对于Spark的作用,DStream将潜在的无限数据流,转换成离散批处理的RDD。

Dstream入门

WordCount 案例实操 需求:使用netcat工具向9999端口不断的发送数据,通过SparkStreaming读取端口数据并统计不同单词出现的次数。 (1)添加依赖
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-mllib --> <dependency> <groupId> org.apache.spark </groupId> <artifactId> spark-mllib_2.12 </artifactId> <version> 2.4.5 </version> </dependency>
(2)编写代码
package com.shujia.spark.test
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream, ReceiverInputDStream
import org.apache.spark.streaming.Seconds, StreamingContext
object DemoSpark01_WordCount 
def main(args: Array[String]): Unit = 
//TODO: 创建环境对象
//StreamingContext创建时,需要传递两个参数
//第一个参数表示环境配置
val sparkconf: SparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName("WordCount")
//第二个参数表示批量处理的周期(采集周期)
val ssc = new StreamingContext(sparkconf,Seconds(3))
//TODO 处理逻辑
//获取端口数据
val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost",
9999)
val words: DStream[String] = lines.flatMap(_.split(" "))
val mapD: DStream[(String, Int)] = words.map((_, 1))
val resultD: DStream[(String, Int)] = mapD.reduceByKey(_ + _)
resultD.print()
//TODO 关闭环境
//由于SparkStreaming采集器时长期执行的任务,所以不能直接关闭
//如果main方法执行完毕,应用程序也会自动结束。所以不能让main执行完毕
// ssc.stop()
//1、启动采集器
ssc.start()
//2、等待采集器的关闭
ssc.awaitTermination()

//打开命令行窗口: //输入: //nc -lp 9999
WordCount 解析 DStream是 Spark Streaming 的基础抽象,代表持续性的数据流和经过各种 Spark 原语操作后的结果数据流。在内部实现上,DStream 是一系列连续的 RDD 来表示。每个 RDD 含有一段时间间隔内的数据。

对数据的操作也是按照 RDD 为单位来进行的:

计算过程由 Spark Engine 来完成:

DStream 创建

RDD 队列: 测试过程中,可以通过使用 ssc.queueStream(queueOfRDDs)来创建 DStream,每一个推送到这 个队列中的 RDD,都会作为一个 DStream 处理。 案例实操: 需求:循环创建几个 RDD,将 RDD 放入队列。通过 SparkStream 创建 Dstream,计算WordCount
package com.shujia.spark.test
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.DStream, ReceiverInputDStream
import org.apache.spark.streaming.Seconds, StreamingContext
import scala.collection.mutable
object DemoSpark02_Queue 
def main(args: Array[String]): Unit = 
//TODO: 创建环境对象
//StreamingContext创建时,需要传递两个参数
//第一个参数表示环境配置
//1、初始化spark配置信息
val sparkconf: SparkConf = new SparkConf()
.setMaster("local[*]").setAppName("WordCount")
//第二个参数表示批量处理的周期(采集周期)
//2.初始化 SparkStreamingContext
val ssc = new StreamingContext(sparkconf,Seconds(3))
//TODO 处理逻辑
//3.创建 RDD 队列
val queue: mutable.Queue[RDD[Int]] = new mutable.Queue[RDD[Int]]()
//4.创建 QueueInputDStream
val inputStream = ssc.queueStream(queue,oneAtATime = false)
//5.处理队列中的 RDD 数据
val mappedStream = inputStream.map((_,1))
val reducedStream = mappedStream.reduceByKey(_ + _)
//6.打印结果
reducedStream.print()
//TODO 关闭环境
//由于SparkStreaming采集器时长期执行的任务,所以不能直接关闭
//如果main方法执行完毕,应用程序也会自动结束。所以不能让main执行完毕
// ssc.stop()
//(1)启动采集器
ssc.start()
//8.循环创建并向 RDD 队列中放入 RDD
for (i <- 1 to 5) 
queue += ssc.sparkContext.makeRDD(1 to 300, 10)
Thread.sleep(2000)

//(2)等待采集器的关闭
ssc.awaitTermination()

package com.shujia.spark.test
import org.apache.commons.codec.StringDecoder
import org.apache.kafka.clients.consumer.ConsumerConfig, ConsumerRecord
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.ConsumerStrategies, KafkaUtils,
LocationStrategies
import org.apache.spark.streaming.Seconds, StreamingContext
object DemoSpark05_Kafka 
def main(args: Array[String]): Unit = 
//TODO: 创建环境对象
//StreamingContext创建时,需要传递两个参数
//第一个参数表示环境配置
//1、初始化spark配置信息
val sparkconf: SparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName("WordCount")
//第二个参数表示批量处理的周期(采集周期)
//2.初始化 SparkStreamingContext
val ssc = new StreamingContext(sparkconf,Seconds(3))val kafkaParas: Map[String, Object] = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG ->
"master:9092,node1:9092,node2:9092",
ConsumerConfig.GROUP_ID_CONFIG -> "shujia",
"key.deserializer" ->
"org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer" ->
"org.apache.kafka.common.serialization.StringDeserializer"
)
//TODO 处理逻辑
val kafkaDataDS: InputDStream[ConsumerRecord[String, String]] =
KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Set("shujia"),
kafkaParas)
)
kafkaDataDS.map(_.value()).print()
//(1)启动采集器
ssc.start()
//(2)等待采集器的关闭
ssc.awaitTermination()

DStream 转换

DStream 上的操作与 RDD 的类似,分为 Transformations(转换)和 Output Operations(输出)两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()、transform()以及 各种 Window 相关的原语。

无状态转化操作

无状态转化操作就是把简单的 RDD 转化操作应用到每个批次上,也就是转化 DStream 中的每 一个 RDD。部分无状态转化操作列在了下表中。注意,针对键值对的 DStream 转化操作(比如 reduceByKey())要添加 import StreamingContext._才能在 Scala 中使用。

需要记住的是,尽管这些函数看起来像作用在整个流上一样,但事实上每个 DStream 在内部是由许多 RDD(批次)组成,且无状态转化操作是分别应用到每个 RDD 上的。例如:reduceByKey()会归约每个时间区间中的数据,但不会归约不同区间之间的数据。 Transform Transform 允许 DStream 上执行任意的 RDD-to-RDD 函数。即使这些函数并没有在 DStream 的 API 中暴露出来,通过该函数可以方便的扩展 Spark API。该函数每一批次调度一次。其实也就是 对 DStream 中的RDD 应用转换。 join 两个流之间的 join 需要两个流的批次大小一致,这样才能做到同时触发计算。计算过程就是对当 前批次的两个流中各自的 RDD 进行 join,与两个 RDD 的 join 效果相同。

有状态转化操作

UpdateStateByKey UpdateStateByKey 原语用于记录历史记录,有时,我们需要在 DStream 中跨批次维护状态(例如流计算中累加 wordcount)。针对这种情况,updateStateByKey()为我们提供了对一个状态变量的访问,用于键值对形式的 DStream。给定一个由(键,事件)对构成的 DStream,并传递一个指定如何根据新的事件更新每个键对应状态的函数,它可以构建出一个新的 DStream,其内部数据(键,状态) 对。 updateStateByKey() 的结果会是一个新的 DStream,其内部的 RDD 序列是由每个时间区间对 应 的(键,状态)对组成的。 updateStateByKey 操作使得我们可以在用新信息进行更新时保持任意的状态。为使用这个功能,需要做下面两步: 1、定义状态,状态可以是一个任意的数据类型。 2、定义状态更新函数,用此函数阐明如何使用之前的状态和来自输入流的新值对状态进行更新。 使用 updateStateByKey 需要对检查点目录进行配置,会使用检查点来保存状态。  
package com.shujia.spark.test
import org.apache.kafka.clients.consumer.ConsumerConfig, ConsumerRecord
import org.apache.spark.SparkConfimport org.apache.spark.streaming.dstream.DStream, InputDStream,
ReceiverInputDStream
import org.apache.spark.streaming.kafka010.ConsumerStrategies, KafkaUtils,
LocationStrategies
import org.apache.spark.streaming.Seconds, StreamingContext
object DemoSpark06_State 
def main(args: Array[String]): Unit = 
//TODO: 创建环境对象
val sparkconf: SparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName("WordCount")
val ssc = new StreamingContext(sparkconf,Seconds(3))
//无状态数据操作,只对当前的采集周期内的数据进行出来
//在某些场合下,需要保留数据统计结果(状态),实现数据的汇总
//使用有状态操作时,需要设定检查点路径
ssc.checkpoint("cp")
val datas: ReceiverInputDStream[String] =
ssc.socketTextStream("localhost", 9999)
val wordToOne: DStream[(String, Int)] = datas.map((_, 1))
// val wordCount: DStream[(String, Int)] = wordToOne.reduceByKey(_ + _)
//updateStateByKey:根据key对数据的状态进行更新
//传递的参数中含有两个值
//第一个值表示相同的key的value数据
//第二个值表示缓冲区相同key的value数据
val state = wordToOne.updateStateByKey(
(seq:Seq[Int],buff:Option[Int]) =>
val newCount: Int = buff.getOrElse(0) + seq.sum
Option(newCount)

)
state.print()
ssc.start()
ssc.awaitTermination()

 自定义数据源

用法及说明 需要继承 Receiver,并实现 onStart、onStop 方法来自定义数据源采集。 案例实操 需求:自定义数据源,实现监控某个端口号,获取该端口号内容。
// 自定义数据源
package com.shujia.spark.test
import java.util.Random
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.Seconds, StreamingContextimport scala.collection.mutable
object DemoSpark03_Diy 
def main(args: Array[String]): Unit = 
//TODO: 创建环境对象
//StreamingContext创建时,需要传递两个参数
//第一个参数表示环境配置
//1、初始化spark配置信息
val sparkconf: SparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName("WordCount")
//第二个参数表示批量处理的周期(采集周期)
//2.初始化 SparkStreamingContext
val ssc = new StreamingContext(sparkconf,Seconds(3))
//TODO 处理逻辑
val messageDS: ReceiverInputDStream[String] = ssc.receiverStream(new
MyReceiver())
messageDS.print()
//(1)启动采集器
ssc.start()
//(2)等待采集器的关闭
ssc.awaitTermination()

/**
* 自定义数据采集器
* 1、继承Receiver,定义泛型,传递参数
* 2、重写方法
*/
class MyReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY)
private var flg = true
override def onStart(): Unit = 
new Thread(new Runnable 
override def run(): Unit = 
while (flg)
val message ="采集的数据为:" + new Random().nextInt(10).toString
store(message)
Thread.sleep(500)


).start()

override def onStop(): Unit = 
flg = false


package com.shujia.spark.test
import java.io.BufferedReader, InputStreamReader
import java.net.Socket
import java.nio.charset.StandardCharsets
import java.util.Random
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.Seconds, StreamingContext
object DemoSpark04_Diy 
def main(args: Array[String]): Unit = 
//TODO: 创建环境对象
//StreamingContext创建时,需要传递两个参数
//第一个参数表示环境配置
//1、初始化spark配置信息
val sparkconf: SparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName("WordCount")
//第二个参数表示批量处理的周期(采集周期)
//2.初始化 SparkStreamingContext
val ssc = new StreamingContext(sparkconf,Seconds(3))
//TODO 处理逻辑
//创建自定义 receiver 的 Streaming
val lineStream: ReceiverInputDStream[String] = ssc.receiverStream(new
MyReceiver("localhost", 9999))
//将每一行数据做切分,形成一个个单词
val wordStream = lineStream.flatMap(_.split("\\t"))
//将单词映射成元组(
word,1)
val wordAndOneStream = wordStream.map((_, 1))
//将相同的单词次数做统计
val wordAndCountStream = wordAndOneStream.reduceByKey(_ + _)
//打印
wordAndCountStream.print()
//(1)启动采集器
ssc.start()
//(2)等待采集器的关闭
ssc.awaitTermination()

/**
* 自定义数据采集器
* 1、继承Receiver,定义泛型,传递参数
* 2、重写方法
*/
class MyReceiver(host:String,port:Int) extends Receiver[String]
(StorageLevel.MEMORY_ONLY)//最初启动的时候,调用该方法,作用为:读数据并将数据发送给spark
override def onStart(): Unit = 
new Thread("Socket Receiver")
override def run(): Unit = 
receiver()

.start()

//读取数据并将数据发送给spark
def receiver()=
//创建一个Socket
val socket = new Socket(host, port)
//定义一个变量,用来接收端口传过来的数据
var input:String = null
//创建一个BufferedReader用于读取端口传来的数据
val reader = new BufferedReader(new
InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8))
//读取数据
input = reader.readLine()
//当 receiver 没有关闭并且输入数据不为空,则循环发送数据给 Spark
while (!isStopped() && input != null) 
store(input)
input = reader.readLine()

//跳出循环则关闭资源
reader.close()
socket.close()
//重启任务
restart("restart")

override def onStop(): Unit = 


Kafka 数据源

版本选型 ReceiverAPI:需要一个专门的 Executor 去接收数据,然后发送给其他的 Executor 做计算。存在 的问题,接收数据的 Executor 和计算的 Executor 速度会有所不同,特别在接收数据的 Executor速度大于计算的Executor 速度,会导致计算数据的节点内存溢出。早期版本中提供此方式,当前版本不适用。 DirectAPI:是由计算的 Executor 来主动消费 Kafka 的数据,速度由自身控制。 Kafka 0-10 Direct 模式 需求:通过 SparkStreaming 从 Kafka 读取数据,并将读取过来的数据做简单计算,最终打印到控制台。导入依赖:
<dependency> <groupId> org.apache.spark </groupId> <artifactId> spark-streaming-kafka-0-10_2.12 </artifactId> <version> 2.4.5 </version> </dependency> <dependency> <groupId> com.fasterxml.jackson.core </groupId> <artifactId> jackson-core </artifactId> <version> 2.10.1 </version> </dependency>
1、上传压缩包到任意节点 2、解压,配置环境变量 所有节点都配置 3、修改config/server.properties 1、broker.id=0,每一个节点broker.id 要不一样 2、zookeeper.connect=master:2181,node1:2181,node2:2181 3、log.dirs=/usr/local/soft/kafka_2.11-1.0.0/data 消息存放的位置 4、复制到其它节点
scp -r kafka_2.11-1.0.0 node2: `pwd` scp -r kafka_2.11-1.0.0 node1: `pwd`
5、修改每个节点的broker.id master=0 node1=1 node2=2 6、启动 1、启动zookeeper, 需要在所有节点启动
zkServer.sh start
查看状态
zkServer.sh status
3,在每台节点启动broker, kafka是去中心化的架构 -daemon 后台启动 在所有节点 启动
kafka-server-start.sh -daemon /usr/local/soft/kafka_2.11-1.0.0/config/server.properties
1、创建topic --replication-factor ---每一个分区的副本数量 --partition --分区数, 根据数据量设置
kafka-topics.sh --create --zookeeper master:2181,node1:2181,node2:2181 --replication-factor 2 --partitions 3 --topic test_topic1
2、查看topic描述信息
kafka-topics.sh --describe --zookeeper master:2181,node1:2181,node2:2181 --topic test_topic1
3、获取所有topic
kafka-topics.sh --list --zookeeper master:2181,node1:2181,node2:2181
4、创建控制台生产者
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic test_topic1
5、创建控制台消费者 --from-beginning 从头消费,, 如果不在执行消费的新的数据
kafka-console-consumer.sh --bootstrap-server master:9092,node1:9092,node2:9092 --from-beginning --topic test_topic1
#启动kafka
bin/kafka-server-start.sh -daemon /usr/local/soft/kafka/config/server.properties
#创建topic
bin/kafka-topics.sh --create --zookeeper master:2181,node1:2181,node2:2181 --replication-factor 2 --partitions 3 --topic shujia
#查看topic信息
bin/kafka-topics.sh --describe --zookeeper master:2181,node1:2181,node2:2181 --topic kong
#显示所有topic
bin/kafka-topics.sh --list --zookeeper master:2181,node1:2181,node2:2181
#创建生产者
bin/kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic kong

以上是关于SparkStreaming的主要内容,如果未能解决你的问题,请参考以下文章