Ephemeral Spark Streaming..以编程方式关闭
Posted
技术标签:
【中文标题】Ephemeral Spark Streaming..以编程方式关闭【英文标题】:Ephemeral Spark Streaming..Shutdown programmatically 【发布时间】:2018-03-07 00:10:33 【问题描述】:背景
我是 Spark 流媒体的新手,对 scala 和 spark 还很陌生。
我有一个 java 大数据包装应用程序,它接受数据输入并生成 libsvm 和/或 csv 格式的数据。这个应用是独立于 spark 的。
我正在开发的功能允许java应用程序打开一个套接字,连接到spark主节点上的springboot应用程序,指示应用程序打开一个spark流,然后将其数据流式传输到spark。
数据流式传输后,Java 应用程序将关闭。
大部分工作正常,但我无法关闭 spark 流上下文,所以一旦 java 端关闭,我就会不停
ERROR ReceiverTracker:取消注册流 0 的接收器:重新启动 延迟 2000 毫秒的接收器:连接被拒绝
DStream 读取文件结束信号。我已经确认收到并解析了
问题
但是,尽管阅读了文档,但我无法找到以编程方式关闭 StreamingContext 的方法。确实,我在网上看到StreamingContext.stop(true, true)
会导致问题。
我的代码如下。任何帮助将不胜感激。
(注意:logger.info("Stopping")
永远不会记录到文件中)
var:stop=false;
@throws(classOf[IKodaMLException])
def startStream(ip:String,port:Int):Unit=
try
val ssc = getSparkStreamingContext(fieldVariables)
ssc.checkpoint("./ikoda/cp")
val lines = ssc.socketTextStream(ip, port, StorageLevel.MEMORY_AND_DISK_SER)
lines.print
val lmap=lines.map
l =>
if(l.contains("IKODA_END_STREAM"))
stop=true;
.....do stuff and return processed line
if(stop)
logger.info("Stopping")
ssc.stop(true,true)
lmap.foreachRDD
r =>
if(r.count() >0)
.......do more stufff
else
logger.info("Empty RDD. No data received")
ssc.start()
ssc.awaitTermination()
【问题讨论】:
【参考方案1】:更新:这个答案似乎在事实上是正确的,但在概念上是错误的。我认为提供了更好的答案in this post
这个答案有点初步。它回答了问题,但没有解决问题。欢迎提供其他意见。
首先,文档说明以编程方式关闭是可以的。但是,我注意到在关闭之前会引发一两个与连接相关的异常。然而,即使 SparkContext 被告知与流一起关闭,它似乎也没有这样做。所以以编程方式关闭似乎是不明智的。除非我可以重新开始直播,否则该项目没有实际意义。
其次,流式传输期间唯一应用于 StreamingContext 的代码是直接引用 DSTream 的代码,因此显然上述代码中的 stop() 调用(上述问题)是错误的。
李>第三,流(据我所知)确实发生在驱动程序上。因此可以创建字段变量,并在 DStream 循环、映射等中引用。
可以创建一个线程来监视关闭调用作为字段级布尔值。然后调用 Streaming Context 并关闭。
线程
var stopScc=false
private def stopSccThread(): Unit =
val thread = new Thread
override def run
var continueRun=true
while (continueRun)
logger.debug("Checking status")
if (stopScc == true)
getSparkStreamingContext(fieldVariables).stop(true, true)
logger.info("Called Stop on Streaming Context")
continueRun=false
Thread.sleep(50)
thread.start
溪流
@throws(classOf[IKodaMLException])
def startStream(ip: String, port: Int): Unit =
try
val ssc = getSparkStreamingContext(fieldVariables)
ssc.checkpoint("./ikoda/cp")
val lines = ssc.socketTextStream(ip, port, StorageLevel.MEMORY_AND_DISK_SER)
lines.print
val lmap = lines.map
l =>
if (l.contains("IKODA_END_STREAM"))
stopScc = true
l
lmap.foreachRDD
r =>
if (r.count() > 0)
logger.info(s"RECEIVED: $r.toString() first: $r.first().toString")
r.saveAsTextFile("./ikoda/test/test")
else
logger.info("Empty RDD. No data received")
ssc.start()
ssc.awaitTermination()
catch
case e: Exception =>
logger.error(e.getMessage, e)
throw new IKodaMLException(e.getMessage, e)
【讨论】:
以上是关于Ephemeral Spark Streaming..以编程方式关闭的主要内容,如果未能解决你的问题,请参考以下文章
.Spark Streaming(上)--实时流计算Spark Streaming原理介
Spark Streaming实时流处理项目实战Spark Streaming整合Kafka实战一
Spark Streaming实时流处理项目实战Spark Streaming整合Kafka实战一
Spark Streaming实时流处理项目实战Spark Streaming整合Kafka实战一
Spark SQL + Window + Streaming 问题 - 使用 Spark Streaming 运行时,Spark SQL 查询需要很长时间才能执行