spark streaming 一个批次取多少数据

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark streaming 一个批次取多少数据相关的知识,希望对你有一定的参考价值。

参考技术A 首先以一个简单的示例开始:用Spark Streaming对从TCP连接中接收的文本进行单词计数。
/**
* 功能:用spark streaming实现的针对流式数据进行单词计数的程序。
* 该程序只是对数据流中的每一批数据进行单独的计数,而没有进行增量计数。
* 环境:spark 1.6.1, scala 2.10.4
*/

// 引入相关类库
import org.apache.spark._
import org.apache.spark.streaming._

object NetworkWordCount
def main(args: Array[String])
// Spark Streaming程序以StreamingContext为起点,其内部维持了一个SparkContext的实例。
// 这里我们创建一个带有两个本地线程的StreamingContext,并设置批处理间隔为1秒。
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
// 在一个Spark应用中默认只允许有一个SparkContext,默认地spark-shell已经为我们创建好了
// SparkContext,名为sc。因此在spark-shell中应该以下述方式创建StreamingContext,以
// 避免创建再次创建SparkContext而引起错误:
// val ssc = new StreamingContext(sc, Seconds(1))

// 创建一个从TCP连接获取流数据的DStream,其每条记录是一行文本
val lines = ssc.socketTextStream("localhost", 9999)

// 对DStream进行转换,最终得到计算结果
val res = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)

// 打印该DStream中每个RDD中的前十个元素
res.print()

// 执行完上面代码,Spark Streaming并没有真正开始处理数据,而只是记录需在数据上执行的操作。
// 当我们设置好所有需要在数据上执行的操作以后,我们就可以开始真正地处理数据了。如下:
ssc.start() // 开始计算
ssc.awaitTermination() // 等待计算终止



为了测试程序,我们得有TCP数据源作为输入,这可以使用Netcat(一般linux系统中都有,如果是windows系统,则推荐你使用 Ncat ,Ncat是一个改进版的Netcat)。如下使用Netcat监听指定本地端口:
nc -lk 9999

如果是使用Ncat,则对应命令如下:
ncat -lk 9999

在IntelliJ IDEA或Eclipse中可以本地运行测试上述Spark Streaming程序,该程序会连接到Netcat(或Ncat)监听的端口,你可以在运行Netcat(或Ncat)的终端中输入东东并回车,然后就可以看到该Spark Streaming程序会马上输出处理结果,并且这个处理是不停的、流式的。
注意:上述示例只是对数据流中的每一批数据进行单独的计数,而没有进行增量计数。本回答被提问者采纳

Spark Streaming 2.2.0 性能调优

传送门:Spark 系统性学习笔记


Spark 版本:2.2.0

Spark Streaming 应用程序要获得最佳性能需要做一些调整优化。这篇文章我们介绍可以提高你应用程序性能的参数以及配置。从高层次来看,你需要关心两件事情:

  • 通过充分利用集群资源,减少每批次数据的处理时间。
  • 设置合理的批次大小,从而尽可能快的处理每批次的数据,即数据处理速度与数据接收速度保持一致。

1. 减少每批次的处理时间

在 Spark 中可以进行许多优化来减少每批次的处理时间。这些已在 Tuning Guide 中详细讨论。在这重点介绍了一些最重要的优化点。

1.1 提升数据接收的并行度

通过网络接收数据(如Kafka,Flume,Socket等)需要将数据反序列化并存储在 Spark 中。如果数据接收成为系统的瓶颈,则需要考虑并行化接收数据。

1.1.1 提升 Receiver 的并发度

每一个输入 DStream 都会创建一个 Receiver(运行在 Worker 节点上&

以上是关于spark streaming 一个批次取多少数据的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Spark Streaming 中仅在新批次上重新训练模型(不采用以前的训练数据集)?

Spark Streaming 2.2.0 性能调优

Spark Streaming 2.2.0 性能调优

Spark Streaming:长队列/活动批次

在 Spark Streaming 中,如何检测空批次?

再谈Spark Streaming Kafka反压