如何停止火花流作业?
Posted
技术标签:
【中文标题】如何停止火花流作业?【英文标题】:How do I stop a spark streaming job? 【发布时间】:2015-12-11 12:18:51 【问题描述】:我有一个连续运行的 Spark Streaming 作业。如何优雅地停止工作?我已经阅读了在作业监视中附加关闭挂钩并向作业发送 SIGTERM 的通常建议。
sys.ShutdownHookThread
logger.info("Gracefully stopping Application...")
ssc.stop(stopSparkContext = true, stopGracefully = true)
logger.info("Application stopped gracefully")
它似乎有效,但看起来不是停止工作的最干净的方法。我在这里错过了什么吗?
从代码的角度来看,这可能是有道理的,但您如何在集群环境中使用它?如果我们启动一个 spark 流式作业(我们将作业分布在集群中的所有节点上),我们将必须跟踪作业的 PID 和它运行的节点。最后,当我们必须停止进程时,我们需要跟踪作业在哪个节点上运行以及它的 PID。我只是希望有一种更简单的流式作业控制方法。
【问题讨论】:
你觉得什么不干净?我认为这是正确的。 我在问题中添加了更多细节。 【参考方案1】:您可以通过运行以下命令来停止集群模式下的流式传输上下文,而无需发送 SIGTERM。这将停止流上下文,而您无需使用线程挂钩显式停止它。
$SPARK_HOME_DIR/bin/spark-submit --master $MASTER_REST_URL --kill $DRIVER_ID
-$MASTER_REST_URL 是 spark 驱动的 rest url,比如 spark://localhost:6066
-$DRIVER_ID 类似于 driver-20150915145601-0000
如果您希望 spark 优雅地停止您的应用程序,您可以在最初提交您的 spark 应用程序时尝试设置以下系统属性(请参阅 http://spark.apache.org/docs/latest/submitting-applications.html 设置 spark 配置属性)。
spark.streaming.stopGracefullyOnShutdown=true
这没有正式记录,我是通过查看 1.4 源代码收集到的。此标志在独立模式下受到尊重。我还没有在集群模式下测试过。
我正在使用 spark 1.4.*
【讨论】:
嗨,ud3sh,在纱线集群模型和1.3版本中,我该怎么办? 'spark.streaming.stopGracefullyOnShutdown' 现在正式记录在 spark 配置页面 spark.apache.org/docs/latest/configuration.html#spark-streaming 如果您在每个上下文中实例化多个作业并希望单独停止作业怎么办? 如何获得 DRIVER_ID?我在日志或 YARN UI 中找不到类似的内容。【参考方案2】:取决于用例以及如何使用驱动程序。
假设您想从 Spark 结构化流中收集一些 N 条记录(推文),将它们存储在 Postgresql 中,并在计数超过 N 条记录时停止流。
一种方法是使用累加器和 python 线程。
使用流查询对象和累加器创建一个 Python 线程,一旦超过计数就停止查询 启动流查询时,传递累加器变量并更新每批流的值。分享代码 sn-p 用于理解/说明目的...
import threading
import time
def check_n_stop_streaming(query, acc, num_records=3500):
while (True):
if acc.value > num_records:
print_info(f"Number of records received so far acc.value")
query.stop()
break
else:
print_info(f"Number of records received so far acc.value")
time.sleep(1)
...
count_acc = spark.sparkContext.accumulator(0)
...
def postgresql_all_tweets_data_dump(df,
epoch_id,
raw_tweet_table_name,
count_acc):
print_info("Raw Tweets...")
df.select(["text"]).show(50, False)
count_acc += df.count()
mode = "append"
url = "jdbc:postgresql://:/".format(self._postgresql_host,
self._postgresql_port,
self._postgresql_database)
properties = "user": self._postgresql_user,
"password": self._postgresql_password,
"driver": "org.postgresql.Driver"
df.write.jdbc(url=url, table=raw_tweet_table_name, mode=mode, properties=properties)
...
query = tweet_stream.writeStream.outputMode("append"). \
foreachBatch(lambda df, id :
postgresql_all_tweets_data_dump(df=df,
epoch_id=id,
raw_tweet_table_name=raw_tweet_table_name,
count_acc=count_acc)).start()
stop_thread = threading.Thread(target=self.check_n_stop_streaming, args=(query, num_records, raw_tweet_table_name, ))
stop_thread.setDaemon(True)
stop_thread.start()
query.awaitTermination()
stop_thread.join()
【讨论】:
【参考方案3】:如果您只需要停止运行流式应用程序,那么最简单的方法是通过 Spark 管理 UI(您可以在 Spark master 的启动日志中找到它的 URL)。
UI 中有一个部分显示正在运行的流式应用程序,每个应用程序 ID 附近都有微小的 (kill)
url 按钮。
【讨论】:
在哪里?我只看到每个 stage 旁边的小(kill)
按钮(在单个接收器上),但如果我杀死其中一个,另一个很快就会产生【参考方案4】:
现在是官方的,请在此处查看原始 apache 文档- http://spark.apache.org/docs/latest/configuration.html#spark-streaming
【讨论】:
请从与答案相关的链接中分享具体内容。仅仅发布一个链接并不能解决 OP 的问题。以上是关于如何停止火花流作业?的主要内容,如果未能解决你的问题,请参考以下文章