如何停止火花流作业?

Posted

技术标签:

【中文标题】如何停止火花流作业?【英文标题】:How do I stop a spark streaming job? 【发布时间】:2015-12-11 12:18:51 【问题描述】:

我有一个连续运行的 Spark Streaming 作业。如何优雅地停止工作?我已经阅读了在作业监视中附加关闭挂钩并向作业发送 SIGTERM 的通常建议。

sys.ShutdownHookThread 
  logger.info("Gracefully stopping Application...")
  ssc.stop(stopSparkContext = true, stopGracefully = true)
  logger.info("Application stopped gracefully")

它似乎有效,但看起来不是停止工作的最干净的方法。我在这里错过了什么吗?

从代码的角度来看,这可能是有道理的,但您如何在集群环境中使用它?如果我们启动一个 spark 流式作业(我们将作业分布在集群中的所有节点上),我们将必须跟踪作业的 PID 和它运行的节点。最后,当我们必须停止进程时,我们需要跟踪作业在哪个节点上运行以及它的 PID。我只是希望有一种更简单的流式作业控制方法。

【问题讨论】:

你觉得什么不干净?我认为这是正确的。 我在问题中添加了更多细节。 【参考方案1】:

您可以通过运行以下命令来停止集群模式下的流式传输上下文,而无需发送 SIGTERM。这将停止流上下文,而您无需使用线程挂钩显式停止它。

$SPARK_HOME_DIR/bin/spark-submit --master $MASTER_REST_URL --kill $DRIVER_ID

-$MASTER_REST_URL 是 spark 驱动的 rest url,比如 spark://localhost:6066

-$DRIVER_ID 类似于 driver-20150915145601-0000

如果您希望 spark 优雅地停止您的应用程序,您可以在最初提交您的 spark 应用程序时尝试设置以下系统属性(请参阅 http://spark.apache.org/docs/latest/submitting-applications.html 设置 spark 配置属性)。

spark.streaming.stopGracefullyOnShutdown=true

这没有正式记录,我是通过查看 1.4 源代码收集到的。此标志在独立模式下受到尊重。我还没有在集群模式下测试过。

我正在使用 spark 1.4.*

【讨论】:

嗨,ud3sh,在纱线集群模型和1.3版本中,我该怎么办? 'spark.streaming.stopGracefullyOnShutdown' 现在正式记录在 spark 配置页面 spark.apache.org/docs/latest/configuration.html#spark-streaming 如果您在每个上下文中实例化多个作业并希望单独停止作业怎么办? 如何获得 DRIVER_ID?我在日志或 YARN UI 中找不到类似的内容。【参考方案2】:

取决于用例以及如何使用驱动程序。

假设您想从 Spark 结构化流中收集一些 N 条记录(推文),将它们存储在 Postgresql 中,并在计数超过 N 条记录时停止流。

一种方法是使用累加器和 python 线程。

使用流查询对象和累加器创建一个 Python 线程,一旦超过计数就停止查询 启动流查询时,传递累加器变量并更新每批流的值。

分享代码 sn-p 用于理解/说明目的...

import threading
import time


def check_n_stop_streaming(query, acc, num_records=3500):
    while (True):
        if acc.value > num_records:
            print_info(f"Number of records received so far acc.value")
            query.stop()
            break
        else:
            print_info(f"Number of records received so far acc.value")
        time.sleep(1)
...

count_acc = spark.sparkContext.accumulator(0)

...

def postgresql_all_tweets_data_dump(df,
                                    epoch_id,
                                    raw_tweet_table_name,
                                    count_acc):

    print_info("Raw  Tweets...")
    df.select(["text"]).show(50, False)
    count_acc += df.count()

    mode = "append"
    url = "jdbc:postgresql://:/".format(self._postgresql_host,
                                              self._postgresql_port,
                                              self._postgresql_database)
    properties = "user": self._postgresql_user,
                  "password": self._postgresql_password,
                  "driver": "org.postgresql.Driver"
    df.write.jdbc(url=url, table=raw_tweet_table_name, mode=mode, properties=properties)

...

query = tweet_stream.writeStream.outputMode("append"). \
    foreachBatch(lambda df, id :
                 postgresql_all_tweets_data_dump(df=df,
                                                 epoch_id=id,
                                                 raw_tweet_table_name=raw_tweet_table_name,
                                                 count_acc=count_acc)).start()





stop_thread = threading.Thread(target=self.check_n_stop_streaming, args=(query, num_records, raw_tweet_table_name, ))
stop_thread.setDaemon(True)
stop_thread.start()

query.awaitTermination()
stop_thread.join()

【讨论】:

【参考方案3】:

如果您只需要停止运行流式应用程序,那么最简单的方法是通过 Spark 管理 UI(您可以在 Spark master 的启动日志中找到它的 URL)。

UI 中有一个部分显示正在运行的流式应用程序,每个应用程序 ID 附近都有微小的 (kill) url 按钮。

【讨论】:

在哪里?我只看到每个 stage 旁边的小 (kill) 按钮(在单个接收器上),但如果我杀死其中一个,另一个很快就会产生【参考方案4】:

现在是官方的,请在此处查看原始 apache 文档- http://spark.apache.org/docs/latest/configuration.html#spark-streaming

【讨论】:

请从与答案相关的链接中分享具体内容。仅仅发布一个链接并不能解决 OP 的问题。

以上是关于如何停止火花流作业?的主要内容,如果未能解决你的问题,请参考以下文章

如何直接在 Azure Blob 存储上存储火花作业(结构化流)的检查点?

一段时间后停止 Spark Streaming 作业

如何从程序中停止 flink 流作业

有没有办法在火花流中展平嵌套的 JSON?

替代递归运行Spark-submit作业

Terraform Azure 流分析作业 - 开始/停止