重新启动火花流应用程序的最佳方法是啥?

Posted

技术标签:

【中文标题】重新启动火花流应用程序的最佳方法是啥?【英文标题】:What is the best way to restart spark streaming application?重新启动火花流应用程序的最佳方法是什么? 【发布时间】:2017-01-18 13:52:08 【问题描述】:

我基本上想在我的驱动程序中编写一个事件回调,它将在该事件到达时重新启动 spark 流应用程序。 我的驱动程序通过从文件中读取配置来设置流和执行逻辑。 每当文件更改(添加新配置)时,驱动程序必须按顺序执行以下步骤,

    重启, 读取配置文件(作为主要方法的一部分)和 设置流

实现这一目标的最佳方法是什么?

【问题讨论】:

【参考方案1】:

在某些情况下,您可能希望动态地重新加载流式上下文(例如重新加载流式操作)。 在这种情况下,您可以(Scala 示例):

val sparkContext = new SparkContext()

val stopEvent = false
var streamingContext = Option.empty[StreamingContext]
val shouldReload = false

val processThread = new Thread 
  override def run(): Unit = 
    while (!stopEvent)
      if (streamingContext.isEmpty) 

        // new context
        streamingContext = Option(new StreamingContext(sparkContext, Seconds(1)))

        // create DStreams
          val lines = streamingContext.socketTextStream(...)

        // your transformations and actions
        // and decision to reload streaming context
        // ...

        streamingContext.get.start()
       else 
        if (shouldReload) 
          streamingContext.get.stop(stopSparkContext = false, stopGracefully = true)
          streamingContext.get.awaitTermination()
          streamingContext = Option.empty[StreamingContext]
         else 
          Thread.sleep(1000)
        
      

    
    streamingContext.get.stop(stopSparkContext =true, stopGracefully = true)
    streamingContext.get.awaitTermination()
  


// and start it  in separate thread
processThread.start()
processThread.join()

或在 python 中:

spark_context = SparkContext()

stop_event = Event()
spark_streaming_context = None
should_reload = False

def process(self):
    while not stop_event.is_set():
        if spark_streaming_context is None:

            # new context
            spark_streaming_context = StreamingContext(spark_context, 0.5)

            # create DStreams
            lines = spark_streaming_context.socketTextStream(...)  

            # your transformations and actions
            # and decision to reload streaming context
            # ...

            self.spark_streaming_context.start()
        else:
            # TODO move to config
            if should_reload:
                spark_streaming_context.stop(stopSparkContext=False, stopGraceFully=True)
                spark_streaming_context.awaitTermination()
                spark_streaming_context = None
            else:
                time.sleep(1)
    else:
        self.spark_streaming_context.stop(stopGraceFully=True)
        self.spark_streaming_context.awaitTermination()


# and start it  in separate thread
process_thread = threading.Thread(target=process)
process_thread.start()
process_thread.join()

如果您想防止代码崩溃并从最后一个位置重新启动流上下文,请使用checkpointing 机制。 它允许您在失败后恢复您的作业状态。

【讨论】:

在 scala 中尝试了类似的方法,似乎效果很好【参考方案2】:

重启Spark的最佳方式实际上是根据你的环境。但总是建议使用spark-submit控制台。

您可以像任何其他linux 进程一样将spark-submit 进程置于后台,方法是将其置于shell 的后台。在您的情况下,spark-submit 作业实际上然后在YARN 上运行驱动程序,因此,它是一个已经通过YARN 在另一台机器上异步运行的进程。

Cloudera blog

【讨论】:

【参考方案3】:

我们最近探索的一种方法(在此处的 spark 聚会中)是通过使用 Zookeeper in Tandem with Spark 来实现这一点。简而言之,这使用 Apache Curator 来监视 Zookeeper 上的更改(ZK 配置的更改,这可以由您的外部事件触发),然后导致侦听器重新启动。

引用的代码库是 here ,您会发现配置中的更改会导致 Watcher(一个 Spark 流应用程序)在正常关闭并重新加载更改后重新启动。希望这是一个指针!

【讨论】:

【参考方案4】:

我目前正在解决这个问题,

通过订阅 MQTT 主题收听外部事件

在 MQTT 回调中,停止流上下文 ssc.stop(true,true),这将正常关闭流和底层 火花配置

通过创建 spark conf 和 通过读取配置文件来设置流

// Contents of startSparkApplication() method
sparkConf = new SparkConf().setAppName("SparkAppName")
ssc = new StreamingContext(sparkConf, Seconds(1))
val myStream = MQTTUtils.createStream(ssc,...)   //provide other options
myStream.print()
ssc.start()

应用程序被构建为 Spring boot 应用程序

【讨论】:

【参考方案5】:

在 Scala 中,停止 sparkStreamingContext 可能涉及停止 SparkContext。我发现当一个receiver挂掉时,最好重启SparkCintext和SparkStreamingContext。

我确信下面的代码可以写得更优雅,但它允许以编程方式重新启动 SparkContext 和 SparkStreamingContext。完成此操作后,您也可以通过编程方式重新启动接收器。

    package coname.utilobjects

import com.typesafe.config.ConfigFactory
import grizzled.slf4j.Logging
import coname.conameMLException
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.Seconds, StreamingContext

import scala.collection.mutable


object SparkConfProviderWithStreaming extends Logging

  val sparkVariables: mutable.HashMap[String, Any] = new mutable.HashMap




trait SparkConfProviderWithStreaming extends Logging






  private val keySSC = "SSC"
  private val keyConf = "conf"
  private val keySparkSession = "spark"


  lazy val   packagesversion=ConfigFactory.load("streaming").getString("streaming.cassandraconfig.packagesversion")
  lazy val   sparkcassandraconnectionhost=ConfigFactory.load("streaming").getString("streaming.cassandraconfig.sparkcassandraconnectionhost")
  lazy val   sparkdrivermaxResultSize=ConfigFactory.load("streaming").getString("streaming.cassandraconfig.sparkdrivermaxResultSize")
  lazy val   sparknetworktimeout=ConfigFactory.load("streaming").getString("streaming.cassandraconfig.sparknetworktimeout")


  @throws(classOf[conameMLException])
  def intitializeSpark(): Unit =
  
    getSparkConf()
    getSparkStreamingContext()
    getSparkSession()
  

  @throws(classOf[conameMLException])
  def getSparkConf(): SparkConf = 
    try 
      if (!SparkConfProviderWithStreaming.sparkVariables.get(keyConf).isDefined) 
        logger.info("\n\nLoading new conf\n\n")
        val conf = new SparkConf().setMaster("local[4]").setAppName("MLPCURLModelGenerationDataStream")
        conf.set("spark.streaming.stopGracefullyOnShutdown", "true")
        conf.set("spark.cassandra.connection.host", sparkcassandraconnectionhost)
        conf.set("spark.driver.maxResultSize", sparkdrivermaxResultSize)
        conf.set("spark.network.timeout", sparknetworktimeout)


        SparkConfProviderWithStreaming.sparkVariables.put(keyConf, conf)
        logger.info("Loaded new conf")
        getSparkConf()
      
      else 
        logger.info("Returning initialized conf")
        SparkConfProviderWithStreaming.sparkVariables.get(keyConf).get.asInstanceOf[SparkConf]
      
    
    catch 
      case e: Exception =>
        logger.error(e.getMessage, e)
        throw new conameMLException(e.getMessage)
    

  

  @throws(classOf[conameMLException])
def killSparkStreamingContext
  
    try
    
      if(SparkConfProviderWithStreaming.sparkVariables.get(keySSC).isDefined)
        
          SparkConfProviderWithStreaming.sparkVariables -= keySSC
          SparkConfProviderWithStreaming.sparkVariables -= keyConf
        
      SparkSession.clearActiveSession()
      SparkSession.clearDefaultSession()

    
    catch 
      case e: Exception =>
        logger.error(e.getMessage, e)
        throw new conameMLException(e.getMessage)
    
  

  @throws(classOf[conameMLException])
  def getSparkStreamingContext(): StreamingContext = 
    try 
      if (!SparkConfProviderWithStreaming.sparkVariables.get(keySSC).isDefined) 
        logger.info("\n\nLoading new streaming\n\n")
        SparkConfProviderWithStreaming.sparkVariables.put(keySSC, new StreamingContext(getSparkConf(), Seconds(6)))

        logger.info("Loaded streaming")
        getSparkStreamingContext()
      
      else 
        SparkConfProviderWithStreaming.sparkVariables.get(keySSC).get.asInstanceOf[StreamingContext]
      
    
    catch 
      case e: Exception =>
        logger.error(e.getMessage, e)
        throw new conameMLException(e.getMessage)
    
  

  def getSparkSession():SparkSession=
  

    if(!SparkSession.getActiveSession.isDefined)
    
      SparkSession.builder.config(getSparkConf()).getOrCreate()

    
    else
      
        SparkSession.getActiveSession.get
      
  


【讨论】:

以上是关于重新启动火花流应用程序的最佳方法是啥?的主要内容,如果未能解决你的问题,请参考以下文章

我继承了 UIView。它的一些子视图是动画的。重新启动这些的正确方法是啥?

从 SSH 重新启动 Dokku 应用程序的正确命令是啥?

回到脚本开头的最佳方法是啥?

如何自动强制关闭excel,然后重新启动?

有人可以建议使用火花流进行日志分析的最佳方法吗

AWS EC2-Instance 备份/终止/启动的最佳实践是啥