一个SparkContext对应多个SparkSession

Posted dqz_nihao

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了一个SparkContext对应多个SparkSession相关的知识,希望对你有一定的参考价值。

原英文文档地址

这篇文章将回答SparkContext和SparkSession的问题。它的第一部分解释了提到的两个SparkSession和SparkContext对象的作用。第二部分讲述了为同一个SparkContext定义多个SparkSession的可能性,最后则试图给出一些使用案例。

SparkSession and SparkContext

为了更好地理解本文所讨论的问题,定义我们将要讨论的内容是很重要的。第一个讨论点是SparkContext。它是位于driver端的Spark程序的入口点。它是一个到Spark集群的物化连接,提供了创建rdd、累加器和广播变量所需的所有抽象。我们只能使用一个有效活动的SparkContext,否则spark将抛出一个在同一个JVM中只能有一个运行的SparkContext的错误(参见SPARK-2243)。不过,当我们设置spark.driver.allowMultipleContexts等于true时,这个异常可以避免。

"two SparkContexts created with allowMultipleContexts=true" should "work" in 
    val sparkConfiguration = new SparkConf().set("spark.driver.allowMultipleContexts", "true")
    val sparkContext1 = new SparkContext("local", "SparkContext#1", sparkConfiguration)
    val sparkContext2 = new SparkContext("local", "SparkContext#2", sparkConfiguration)
  

  "two SparkContexts" should "fail the processing" in 
    val exception = intercept[Exception] 
      new SparkContext("local", "SparkContext#1")
      new SparkContext("local", "SparkContext#2")
    

    exception.getMessage should startWith("Only one SparkContext may be running in this JVM (see SPARK-2243)")
  

然而,在同一个JVM中拥有多个sparkcontext并不是一个好的实现。它对单元测试很有用,顺便说一下,这是它在Apache Spark library的主要用途。在测试范围之外,不能保证我们的程序在多个活动的SparkContext中正确地工作。此外,这也使得对我们的程序数据流管理更加困难。工作流并不是孤立的——一个上下文的潜在故障可能影响另一个上下文,甚至可能破坏整个JVM。它还会给driver程序做的所有事情带来额外的硬件压力。即使我们用toLocalIterator收集了一部分数据,但是要处理的数据总是比单独的进程多很多倍。

SparkContext的一个缺点是只能处理某一特定的场景。比如为了使用Hive,我们需要使用HiveContext,想处理结构化数据,就必须使用SQLContext实例,想处理实时流数据,就得用StreamingContext。然而SparkSession解决了这个问题,它是所有不同管道的一个公共入口点。SparkSession的实例是用一个通用的构建器构造的,除了Hive需要调用enableHive()方法。

SparkSessions sharing SparkContext

如前所述,每个 JVM 拥有多个 SparkContext 在技术上是可行的,但同时它被认为是一种不好的做法。 Apache Spark 提供了一个工厂方法 getOrCreate() 来防止创建多个 SparkContext:

"two SparkContext created with a factory method" should "not fail" in 
    // getOrCreate is a factory method working with singletons
    val sparkContext1 = SparkContext.getOrCreate(new SparkConf().setAppName("SparkContext#1").setMaster("local"))
    val sparkContext2 = SparkContext.getOrCreate(new SparkConf().setAppName("SparkContext#2").setMaster("local"))

    sparkContext1.parallelize(Seq(1, 2, 3))
    sparkContext2.parallelize(Seq(4, 5, 6))

    sparkContext1 shouldEqual sparkContext2
  

由于sparkSession的特点,有多个sparkSession是可能的。SparkSession是SparkContext的包装器。context是由构建器隐式创建的,没有任何额外的配置选项:

 "Spark" should "create 2 SparkSessions" in 
    val sparkSession1 = SparkSession.builder().appName("SparkSession#1").master("local").getOrCreate()
    val sparkSession2 = sparkSession1.newSession()

    sparkSession1.sparkContext shouldEqual sparkSession2.sparkContext
    sparkSession1.stop()
    // Since both SparkContexts are equal, stopping the one for sparkSession1 will make the context of
    // sparkSession2 stopped too
    sparkSession2.sparkContext.isStopped shouldBe true
    // and that despite the different sessions
    sparkSession1 shouldNot equal(sparkSession2)
  

但正如您所注意到的,如果我们继续使用构建器,我们将始终获得相同的 SparkSession 实例:

"Spark" should "create 1 instance of SparkSession with builder"in 
    val sparkSession1 = SparkSession.builder().appName("SparkSession#1").master("local").getOrCreate()
    val sparkSession2 = SparkSession.builder().appName("SparkSession#2").master("local").getOrCreate()

    sparkSession1 shouldEqual sparkSession2
  

Multiple SparkSessions use cases

因此,拥有多个 SparkSession 非常好——至少我们不需要使用特定的配置选项。但是,为什么我们需要它?第一个明显的用例——当我们需要使用来自不同 SparkSessions 的数据时,这些数据不能共享相同的配置。例如,配置可能涉及 2 个不同的 Hive 元存储及其必须以某种方式混合在一起的数据。仅出于说明目的,我使用 JSON 文件尝试了此场景:

  "Spark" should "launch 2 different apps for reading JSON files" in 
    val commonDataFile = new File("/tmp/spark_sessions/common_data.jsonl")
    val commonData =
      """
        | "id": 1, "name": "A"
        | "id": 2, "name": "B"
        | "id": 3, "name": "C"
        | "id": 4, "name": "D"
        | "id": 5, "name": "E"
        | "id": 6, "name": "F"
        | "id": 7, "name": "G"
      """.stripMargin
    FileUtils.writeStringToFile(commonDataFile, commonData)
    val dataset1File = new File("/tmp/spark_sessions/dataset_1.jsonl")
    val dataset1Data =
      """
        | "value": 100, "join_key": 1
        | "value": 300, "join_key": 3
        | "value": 500, "join_key": 5
        | "value": 700, "join_key": 7
      """.stripMargin
    FileUtils.writeStringToFile(dataset1File, dataset1Data)
    val dataset2File = new File("/tmp/spark_sessions/dataset_2.jsonl")
    val dataset2Data =
      """
        | "value": 200, "join_key": 2
        | "value": 400, "join_key": 4
        | "value": 600, "join_key": 6
      """.stripMargin
    FileUtils.writeStringToFile(dataset2File, dataset2Data)
    // Executed against standalone cluster to better see that there is only 1 Spark application created
    val sparkSession = SparkSession.builder().appName(s"SparkSession for 2 different sources").master("local")
      .config("spark.executor.extraClassPath", sys.props("java.class.path"))
      .getOrCreate()
    val commonDataset = sparkSession.read.json(commonDataFile.getAbsolutePath)
    commonDataset.cache()
    import org.apache.spark.sql.functions._
    val oddNumbersDataset = sparkSession.read.json(dataset1File.getAbsolutePath)
      .join(commonDataset, col("id") === col("join_key"), "left")
    val oddNumbers = oddNumbersDataset.collect()


    // Without stop the SparkSession is represented under the same name in the UI and the master remains the same
    // sparkSession.stop()
    // But if you stop the session you won't be able to join the data from the second session with a dataset from the first session
    SparkSession.clearActiveSession()
    SparkSession.clearDefaultSession()

    val sparkSession2 = SparkSession.builder().appName(s"Another Spark session").master("local")
      .config("spark.executor.extraClassPath", sys.props("java.class.path"))
      .getOrCreate()

    SparkSession.setDefaultSession(sparkSession2)
    val pairNumbersDataset = sparkSession2.read.json(dataset2File.getAbsolutePath)
      .join(commonDataset, col("id") === col("join_key"), "left")

    val pairNumbers = pairNumbersDataset.collect()

    sparkSession shouldNot equal(sparkSession2)
    def stringifyRow(row: Row): String = 
      s"$row.getAs[Int]("id")-$row.getAs[String]("name")-$row.getAs[Int]("value")"
    
    val oddNumbersMapped = oddNumbers.map(stringifyRow(_))
    oddNumbersMapped should have size 4
    oddNumbersMapped should contain allOf("1-A-100", "3-C-300", "5-E-500", "7-G-700")
    val pairNumbersMapped = pairNumbers.map(stringifyRow(_))
    pairNumbersMapped should have size 3
    pairNumbersMapped should contain allOf("2-B-200", "4-D-400", "6-F-600")
  

另外,至少在理论上,我们可以从一个公共代码中启动两个不同的独立Spark作业。使用编排工具似乎是一个更好的主意,因为在第二个会话失败的情况下,您只需要重新启动它,而不需要重新计算第一个数据集。

另一个我们可以使用多个SparkSessions的纯理论例子是,当一些外部输入定义了不共享要启动的相同配置的作业数量:

  "Spark" should "launch 3 applications in 3 different threads" in 
    val logAppender = InMemoryLogAppender.createLogAppender(Seq("SparkSession#0",
      "SparkSession#1", "SparkSession#2"))
    val latch = new CountDownLatch(3)
    (0 until 3).map(nr => new Thread(new Runnable() 
      override def run(): Unit = 
        // You can submit this application to a standalone cluster to see in the UI that always only 1 app name
        // is picked up and despite of that, all 3 applications are executed inside
        val config = new SparkConf().setMaster("local").setAppName(s"SparkSession#$nr")
          .set("spark.executor.extraClassPath", sys.props("java.class.path"))
        val sparkSession = SparkSession.builder().config(config)
          .getOrCreate()
        import sparkSession.implicits._
        val dataset = (0 to 5).map(nr => (nr, s"$nr")).toDF("id", "label")
        val rowsIds = dataset.collect().map(row => row.getAs[Int]("id")).toSeq
        // Give some time to make the observations
        Thread.sleep(3000)
        println(s"adding $rowsIds")
        AccumulatedRows.data.appendAll(Seq(rowsIds))
        latch.countDown()
      
    ).start())
    latch.await(3, TimeUnit.MINUTES)
    // Add a minute to prevent against race conditions
    Thread.sleep(1000L)

    AccumulatedRows.data should have size 3
    AccumulatedRows.data.foreach(rows => rows should contain allOf(0, 1, 2, 3, 4, 5))
    logAppender.getMessagesText() should have size 1
    // Since it's difficult to deduce which application was submitted, we check only the beginning of the log message
    val firstMessage = logAppender.getMessagesText()(0)
    firstMessage should startWith("Submitted application: SparkSession#")
    val Array(_, submittedSessionId) = firstMessage.split("#")
    val allSessions = Seq("0", "1", "2")
    val missingSessionsFromLog = allSessions.diff(Seq(submittedSessionId))
    missingSessionsFromLog should have size 2
  

虽然SparkSession封装了SparkContext,并且由于默认情况下每个JVM只能有一个context,所以上面示例中的所有SparkSession都在单个应用程序的UI中表示——通常是第一个启动的应用程序。下图显示了多次执行上述代码片段后的UI。如你所见,执行的应用程序总是使用3个提交的名字中的一个:

因此,即使我们为每个SparkSession指定了不同的配置,例如不同的主地址,它也不会有任何效果。Spark将始终使用第一个启动会话的配置,因此,也将使用第一个创建的SparkContext的配置。当然,我们可以通过调用给定SparkSession实例的stop()方法强制停止上下文。但在这种情况下,我们失去了与停止会话创建的DataFrames交互的可能性。

前面描述的行为证明了调试这样的应用程序是困难的,而使用编排工具独立提交这3个作业则要容易得多。除了监视功能之外,此解决方案还受益于更简单的数据恢复过程和更简单的代码——失败或成功作业的逻辑留给编排工具,处理代码可以专注于它应该如何最好地处理数据。

这篇文章解释了SparkContext和SparkSession之间的交互。第一部分介绍了两个负责管理rdd、广播变量、累加器和DataFrames的类。第二部分展示了如何在单个JVM中拥有SparkContext和SparkSession的多个实例。尽管这不是一个好的建议,但通过一些额外的配置工作,这是可能的。第三部分试图解释为什么可以使用多个sparksession。但所提出的任何一个用例都不能让人信服。每次多个作业的外部编排的解决方案似乎都更容易调试、监控和恢复。

以上是关于一个SparkContext对应多个SparkSession的主要内容,如果未能解决你的问题,请参考以下文章

一个SparkContext对应多个SparkSession

Apache Spark 启动多个 SparkContext 实例

为什么每个JVM只允许一个SparkContext?

Spark架构解析(转)

spark源码解读-SparkContext初始化过程

Spark架构角色及基本运行流程