Ephemeral Spark Streaming..以编程方式关闭

Posted

技术标签:

【中文标题】Ephemeral Spark Streaming..以编程方式关闭【英文标题】:Ephemeral Spark Streaming..Shutdown programmatically 【发布时间】:2018-03-07 00:10:33 【问题描述】:

背景

我是 Spark 流媒体的新手,对 scala 和 spark 还很陌生。

我有一个 java 大数据包装应用程序,它接受数据输入并生成 libsvm 和/或 csv 格式的数据。这个应用是独立于 spark 的。

我正在开发的功能允许java应用程序打开一个套接字,连接到spark主节点上的springboot应用程序,指示应用程序打开一个spark流,然后将其数据流式传输到spark。

数据流式传输后,Java 应用程序将关闭。

大部分工作正常,但我无法关闭 spark 流上下文,所以一旦 java 端关闭,我就会不停

ERROR ReceiverTracker:取消注册流 0 的接收器:重新启动 延迟 2000 毫秒的接收器:连接被拒绝

DStream 读取文件结束信号。我已经确认收到并解析了

问题

但是,尽管阅读了文档,但我无法找到以编程方式关闭 StreamingContext 的方法。确实,我在网上看到StreamingContext.stop(true, true) 会导致问题。

我的代码如下。任何帮助将不胜感激。

(注意:logger.info("Stopping") 永远不会记录到文件中)

var:stop=false;

@throws(classOf[IKodaMLException])
def  startStream(ip:String,port:Int):Unit=

 try 
  val ssc = getSparkStreamingContext(fieldVariables)
  ssc.checkpoint("./ikoda/cp")

  val lines = ssc.socketTextStream(ip, port, StorageLevel.MEMORY_AND_DISK_SER)
  lines.print

  val lmap=lines.map
  
    l =>
      if(l.contains("IKODA_END_STREAM"))
        
          stop=true;
          
        

      .....do stuff and return processed line
  

 if(stop)
    
      logger.info("Stopping")
      ssc.stop(true,true)
    

  
    lmap.foreachRDD 
      r =>
        if(r.count() >0) 
          .......do more stufff
        
        else
          
            logger.info("Empty RDD. No data received")
          
    
  ssc.start()
  ssc.awaitTermination()

【问题讨论】:

【参考方案1】:

更新:这个答案似乎在事实上是正确的,但在概念上是错误的。我认为提供了更好的答案in this post


这个答案有点初步。它回答了问题,但没有解决问题。欢迎提供其他意见。

首先,文档说明以编程方式关闭是可以的。但是,我注意到在关闭之前会引发一两个与连接相关的异常。然而,即使 SparkContext 被告知与流一起关闭,它似乎也没有这样做。所以以编程方式关闭似乎是不明智的。除非我可以重新开始直播,否则该项目没有实际意义。

其次,流式传输期间唯一应用于 StreamingContext 的代码是直接引用 DSTream 的代码,因此显然上述代码中的 stop() 调用(上述问题)是错误的。

李>

第三,流(据我所知)确实发生在驱动程序上。因此可以创建字段变量,并在 DStream 循环、映射等中引用。

可以创建一个线程来监视关闭调用作为字段级布尔值。然后调用 Streaming Context 并关闭。

线程

var stopScc=false

private def stopSccThread(): Unit = 
val thread = new Thread 
  override def run 

    var continueRun=true
    while (continueRun) 
      logger.debug("Checking status")
      if (stopScc == true) 
        getSparkStreamingContext(fieldVariables).stop(true, true)
        logger.info("Called Stop on Streaming Context")
        continueRun=false


      
      Thread.sleep(50)
    
  

thread.start


溪流

@throws(classOf[IKodaMLException])
def startStream(ip: String, port: Int): Unit = 

try 
  val ssc = getSparkStreamingContext(fieldVariables)
  ssc.checkpoint("./ikoda/cp")

  val lines = ssc.socketTextStream(ip, port, StorageLevel.MEMORY_AND_DISK_SER)
  lines.print


  val lmap = lines.map 
    l =>

      if (l.contains("IKODA_END_STREAM")) 
        stopScc = true
      
      l
  


  lmap.foreachRDD 
    r =>
      if (r.count() > 0) 
        logger.info(s"RECEIVED: $r.toString() first: $r.first().toString")
        r.saveAsTextFile("./ikoda/test/test")
      
      else 
        logger.info("Empty RDD. No data received")
      
  
  ssc.start()

  ssc.awaitTermination()

catch 
  case e: Exception =>
    logger.error(e.getMessage, e)
    throw new IKodaMLException(e.getMessage, e)

【讨论】:

以上是关于Ephemeral Spark Streaming..以编程方式关闭的主要内容,如果未能解决你的问题,请参考以下文章

spark streaming kafka example

.Spark Streaming(上)--实时流计算Spark Streaming原理介

Spark Streaming实时流处理项目实战Spark Streaming整合Kafka实战一

Spark Streaming实时流处理项目实战Spark Streaming整合Kafka实战一

Spark Streaming实时流处理项目实战Spark Streaming整合Kafka实战一

Spark SQL + Window + Streaming 问题 - 使用 Spark Streaming 运行时,Spark SQL 查询需要很长时间才能执行