如何拆除 SparkSession 并在一个应用程序中创建一个新的?
Posted
技术标签:
【中文标题】如何拆除 SparkSession 并在一个应用程序中创建一个新的?【英文标题】:How can I tear down a SparkSession and create a new one within one application? 【发布时间】:2017-01-05 18:18:17 【问题描述】:我有一个带有多个独立模块的 pyspark 程序,每个模块可以独立处理数据以满足我的各种需求。但它们也可以链接在一起以处理管道中的数据。这些模块中的每一个都构建了一个 SparkSession 并自行完美执行。
但是,当我尝试在同一个 python 进程中连续运行它们时,我遇到了问题。在管道中的第二个模块执行的那一刻,spark 抱怨我尝试使用的 SparkContext 已停止:
py4j.protocol.Py4JJavaError: An error occurred while calling o149.parquet.
: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
这些模块中的每一个都在执行开始时构建一个 SparkSession,并在其进程结束时停止 sparkContext。我像这样构建和停止会话/上下文:
session = SparkSession.builder.appName("myApp").getOrCreate()
session.stop()
根据official documentation,getOrCreate
“获取现有的 SparkSession,或者,如果没有现有的,则根据此构建器中设置的选项创建一个新的。”但我不想要这种行为(进程试图获取现有会话的这种行为)。我找不到任何方法来禁用它,也无法弄清楚如何销毁会话——我只知道如何停止其关联的 SparkContext。
如何在独立的模块中构建新的 SparkSession,并在同一个 Python 进程中按顺序执行它们,而以前的会话不会干扰新创建的会话?
以下是项目结构示例:
main.py
import collect
import process
if __name__ == '__main__':
data = collect.execute()
process.execute(data)
collect.py
import datagetter
def execute(data=None):
session = SparkSession.builder.appName("myApp").getOrCreate()
data = data if data else datagetter.get()
rdd = session.sparkContext.parallelize(data)
[... do some work here ...]
result = rdd.collect()
session.stop()
return result
process.py
import datagetter
def execute(data=None):
session = SparkSession.builder.appName("myApp").getOrCreate()
data = data if data else datagetter.get()
rdd = session.sparkContext.parallelize(data)
[... do some work here ...]
result = rdd.collect()
session.stop()
return result
【问题讨论】:
【参考方案1】:长话短说,Spark(包括 PySpark)并非旨在在单个应用程序中处理多个上下文。如果您对 JVM 方面的故事感兴趣,我建议您阅读SPARK-2243(已解决为无法修复)。
在 PySpark 中做出了许多反映这一点的设计决策,包括但不限于 a singleton Py4J gateway。有效you cannot have multiple SparkContexts
in a single application。 SparkSession
不仅绑定到 SparkContext
,而且还引入了它自己的问题,比如处理本地(独立)Hive 元存储(如果使用的话)。此外,还有在内部使用SparkSession.builder.getOrCreate
的函数,取决于你现在看到的行为。一个值得注意的例子是 UDF 注册。如果存在多个 SQL 上下文(例如 RDD.toDF
),其他函数可能会出现意外行为。
多个上下文不仅不受支持,而且在我个人看来,这违反了单一责任原则。您的业务逻辑不应该关心所有设置、清理和配置细节。
我的个人建议如下:
如果应用程序由多个连贯的模块组成,这些模块可以组合并受益于具有缓存和公共元存储的单个执行环境,则在应用程序入口点初始化所有必需的上下文,并在必要时将它们传递到各个管道:
main.py
:
from pyspark.sql import SparkSession
import collect
import process
if __name__ == "__main__":
spark: SparkSession = ...
# Pass data between modules
collected = collect.execute(spark)
processed = process.execute(spark, data=collected)
...
spark.stop()
collect.py
/process.py
:
from pyspark.sql import SparkSession
def execute(spark: SparkSession, data=None):
...
否则(根据您的描述,这里似乎是这种情况)我会设计入口点来执行单个管道并使用外部工作流管理器(如Apache Airflow 或Toil)来处理执行。
它不仅更干净,而且还允许更灵活的故障恢复和调度。
同样的事情当然也可以用建设者来完成,但就像 smart person 曾经说过的那样:显式优于隐式。
main.py
import argparse
from pyspark.sql import SparkSession
import collect
import process
pipelines = "collect": collect, "process": process
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('--pipeline')
args = parser.parse_args()
spark: SparkSession = ...
# Execute a single pipeline only for side effects
pipelines[args.pipeline].execute(spark)
spark.stop()
collect.py
/ process.py
同上一点。
我会以一种或另一种方式保留一个且只有一个设置上下文的位置,以及一个且只有一个要拆除的位置。
【讨论】:
这个答案在我调试自己的问题时非常有帮助:尝试在一个 Python 应用程序中重新创建 Spark 上下文。它在一个测试套件中,我们希望每次都重新创建一个具有不同 JAR 依赖项的上下文。没有按预期运行,我们看到底层java
proc 正在执行中的根本原因。原来java
调用仅在第一个会话上构建(使用该会话的任何关联配置),并且从未重新创建。我明白为什么会这样。 TL;DR:来自重新创建的上下文的任何后续配置都将被忽略。正如答案所说:不要这样做。【参考方案2】:
这是一种解决方法,但不是解决方案:
我发现source code 中的SparkSession
类包含以下__init__
(我已从此处删除了不相关的代码行):
_instantiatedContext = None
def __init__(self, sparkContext, jsparkSession=None):
self._sc = sparkContext
if SparkSession._instantiatedContext is None:
SparkSession._instantiatedContext = self
因此,我可以通过在调用session.stop()
后将会话的_instantiatedContext
属性设置为None
来解决我的问题。当下一个模块执行时,它调用getOrCreate()
,并没有找到之前的_instantiatedContext
,所以它分配了一个新的sparkContext
。
这不是一个非常令人满意的解决方案,但它可以作为一种解决方法来满足我当前的需求。我不确定这种开始独立会话的整个方法是反模式还是不寻常。
【讨论】:
【参考方案3】:为什么不将同一个 Spark 会话实例传递到管道的多个阶段?您可以使用构建器模式。在我看来,您在每个阶段的最后都收集结果集,然后将这些数据传递到下一个阶段。考虑将集群中的数据留在同一个会话中,并将会话引用和结果引用从一个阶段传递到另一个阶段,直到您的应用程序完成。
换句话说,把
session = SparkSession.builder...
...在您的主目录中。
【讨论】:
【参考方案4】:spark_current_session = SparkSession. \
builder. \
appName('APP'). \
config(conf=SparkConf()). \
getOrCreate()
spark_current_session.newSession()
您可以从当前会话创建一个新会话
【讨论】:
请注意,如果先前的会话存在,则此处传入的任何配置都不会实际应用;配置仍将取自旧会话的SparkContext
。以上是关于如何拆除 SparkSession 并在一个应用程序中创建一个新的?的主要内容,如果未能解决你的问题,请参考以下文章