如何拆除 SparkSession 并在一个应用程序中创建一个新的?

Posted

技术标签:

【中文标题】如何拆除 SparkSession 并在一个应用程序中创建一个新的?【英文标题】:How can I tear down a SparkSession and create a new one within one application? 【发布时间】:2017-01-05 18:18:17 【问题描述】:

我有一个带有多个独立模块的 pyspark 程序,每个模块可以独立处理数据以满足我的各种需求。但它们也可以链接在一起以处理管道中的数据。这些模块中的每一个都构建了一个 SparkSession 并自行完美执行。

但是,当我尝试在同一个 python 进程中连续运行它们时,我遇到了问题。在管道中的第二个模块执行的那一刻,spark 抱怨我尝试使用的 SparkContext 已停止:

py4j.protocol.Py4JJavaError: An error occurred while calling o149.parquet.
: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.

这些模块中的每一个都在执行开始时构建一个 SparkSession,并在其进程结束时停止 sparkContext。我像这样构建和停止会话/上下文:

session = SparkSession.builder.appName("myApp").getOrCreate()
session.stop()

根据official documentation,getOrCreate“获取现有的 SparkSession,或者,如果没有现有的,则根据此构建器中设置的选项创建一个新的。”但我不想要这种行为(进程试图获取现有会话的这种行为)。我找不到任何方法来禁用它,也无法弄清楚如何销毁会话——我只知道如何停止其关联的 SparkContext。

如何在独立的模块中构建新的 SparkSession,并在同一个 Python 进程中按顺序执行它们,而以前的会话不会干扰新创建的会话?

以下是项目结构示例:

ma​​in.py

import collect
import process

if __name__ == '__main__':
    data = collect.execute()
    process.execute(data)

collect.py

import datagetter

def execute(data=None):
    session = SparkSession.builder.appName("myApp").getOrCreate()

    data = data if data else datagetter.get()
    rdd = session.sparkContext.parallelize(data)
    [... do some work here ...]
    result = rdd.collect()
    session.stop()
    return result

process.py

import datagetter

def execute(data=None):
    session = SparkSession.builder.appName("myApp").getOrCreate()
    data = data if data else datagetter.get()
    rdd = session.sparkContext.parallelize(data)
    [... do some work here ...]
    result = rdd.collect()
    session.stop()
    return result

【问题讨论】:

【参考方案1】:

长话短说,Spark(包括 PySpark)并非旨在在单个应用程序中处理多个上下文。如果您对 JVM 方面的故事感兴趣,我建议您阅读SPARK-2243(已解决为无法修复)。

在 PySpark 中做出了许多反映这一点的设计决策,包括但不限于 a singleton Py4J gateway。有效you cannot have multiple SparkContexts in a single application。 SparkSession 不仅绑定到 SparkContext,而且还引入了它自己的问题,比如处理本地(独立)Hive 元存储(如果使用的话)。此外,还有在内部使用SparkSession.builder.getOrCreate 的函数,取决于你现在看到的行为。一个值得注意的例子是 UDF 注册。如果存在多个 SQL 上下文(例如 RDD.toDF),其他函数可能会出现意外行为。

多个上下文不仅不受支持,而且在我个人看来,这违反了单一责任原则。您的业​​务逻辑不应该关心所有设置、清理和配置细节。

我的个人建议如下:

如果应用程序由多个连贯的模块组成,这些模块可以组合并受益于具有缓存和公共元存储的单个执行环境,则在应用程序入口点初始化所有必需的上下文,并在必要时将它们传递到各个管道:

main.py:

from pyspark.sql import SparkSession

import collect
import process

if __name__ == "__main__":
    spark: SparkSession = ...

    # Pass data between modules
    collected = collect.execute(spark)
    processed = process.execute(spark, data=collected)
    ...
    spark.stop()

collect.py/process.py:

from pyspark.sql import SparkSession

def execute(spark: SparkSession, data=None):
    ...

否则(根据您的描述,这里似乎是这种情况)我会设计入口点来执行单个管道并使用外部工作流管理器(如Apache Airflow 或Toil)来处理执行。

它不仅更干净,而且还允许更灵活的故障恢复和调度。

同样的事情当然也可以用建设者来完成,但就像 smart person 曾经说过的那样:显式优于隐式。

main.py

import argparse

from pyspark.sql import SparkSession

import collect
import process

pipelines = "collect": collect, "process": process

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument('--pipeline')
    args = parser.parse_args()

    spark: SparkSession = ...

    # Execute a single pipeline only for side effects
    pipelines[args.pipeline].execute(spark)
    spark.stop()

collect.py / process.py 同上一点。

我会以一种或另一种方式保留一个且只有一个设置上下文的位置,以及一个且只有一个要拆除的位置。

【讨论】:

这个答案在我调试自己的问题时非常有帮助:尝试在一个 Python 应用程序中重新创建 Spark 上下文。它在一个测试套件中,我们希望每次都重新创建一个具有不同 JAR 依赖项的上下文。没有按预期运行,我们看到底层java proc 正在执行中的根本原因。原来java 调用仅在第一个会话上构建(使用该会话的任何关联配置),并且从未重新创建。我明白为什么会这样。 TL;DR:来自重新创建的上下文的任何后续配置都将被忽略。正如答案所说:不要这样做。【参考方案2】:

这是一种解决方法,但不是解决方案:

我发现source code 中的SparkSession 类包含以下__init__(我已从此处删除了不相关的代码行):

_instantiatedContext = None

def __init__(self, sparkContext, jsparkSession=None):
    self._sc = sparkContext
    if SparkSession._instantiatedContext is None:
        SparkSession._instantiatedContext = self

因此,我可以通过在调用session.stop() 后将会话的_instantiatedContext 属性设置为None 来解决我的问题。当下一个模块执行时,它调用getOrCreate(),并没有找到之前的_instantiatedContext,所以它分配了一个新的sparkContext

这不是一个非常令人满意的解决方案,但它可以作为一种解决方法来满足我当前的需求。我不确定这种开始独立会话的整个方法是反模式还是不寻常。

【讨论】:

【参考方案3】:

为什么不将同一个 Spark 会话实例传递到管道的多个阶段?您可以使用构建器模式。在我看来,您在每个阶段的最后都收集结果集,然后将这些数据传递到下一个阶段。考虑将集群中的数据留在同一个会话中,并将会话引用和结果引用从一个阶段传递到另一个阶段,直到您的应用程序完成。

换句话说,把

session = SparkSession.builder...

...在您的主目录中。

【讨论】:

【参考方案4】:
spark_current_session = SparkSession. \
                             builder. \
                             appName('APP'). \
                             config(conf=SparkConf()). \
                             getOrCreate()
spark_current_session.newSession()

您可以从当前会话创建一个新会话

【讨论】:

请注意,如果先前的会话存在,则此处传入的任何配置都不会实际应用;配置仍将取自旧会话的SparkContext

以上是关于如何拆除 SparkSession 并在一个应用程序中创建一个新的?的主要内容,如果未能解决你的问题,请参考以下文章

iTunes Connect 促销代码和应用程​​序更新

如何在多线程 C++ 中拆除观察者关系?

如何使用 SparkSession 从列表中创建数据框?

使用SparkSession.builder时如何设置profiler_cls?

esp32多个例程如何组合

协程解析前的响应,kotlin