为 MRJob/pySpark 中的每个 SparkStep 创建新的 SparkContext

Posted

技术标签:

【中文标题】为 MRJob/pySpark 中的每个 SparkStep 创建新的 SparkContext【英文标题】:Creating new SparkContext for each SparkStep in MRJob/ pySpark 【发布时间】:2018-04-04 20:35:24 【问题描述】:

我是 pySpark 的新手,我正在尝试使用 MRJob 实现多步骤 EMR/Spark 作业,我需要为每个 SparkStep 创建一个新的 SparkContext,还是可以为所有 SparkSteps 共享相同的 SparkContext?

我试图查找MRJob manual,但不幸的是,这并不清楚。

有人可以告诉我正确的方法是什么吗?

    创建一个单独的 SparkContext:

    class MRSparkJob(MRJob):
        def spark_step1(self, input_path, output_path):
            from pyspark import SparkContext
            sc = SparkContext(appName='appname')
            ...
            sc.stop()
    
        def spark_step2(self, input_path, output_path):
            from pyspark import SparkContext
            sc = SparkContext(appName='appname')
            ...
            sc.stop()
    
        def steps(self):
            return [SparkStep(spark=self.spark_step1),
                    SparkStep(spark=self.spark_step2)]
    
    if __name__ == '__main__':
        MRSparkJob.run()
    

    创建一个 SparkContext 并在不同的 SparkSteps 之间共享它

    class MRSparkJob(MRJob):
    
        sc = None
    
        def spark_step1(self, input_path, output_path):
            from pyspark import SparkContext
            self.sc = SparkContext(appName='appname')
            ...
    
    
        def spark_step2(self, input_path, output_path):
            from pyspark import SparkContext
    
            ... (reuse the same self.sc)
            self.sc.stop()
    
        def steps(self):
            return [SparkStep(spark=self.spark_step1),
                    SparkStep(spark=self.spark_step2)]
    
    if __name__ == '__main__':
        MRSparkJob.run()
    

【问题讨论】:

【参考方案1】:

根据MRJob discussion group 的 Dave 所说,我们应该为每个步骤创建一个新的 SparkContext,因为每个步骤都是对 Hadoop 和 Spark 的全新调用(即上面的 #1 是正确的方法)。

【讨论】:

以上是关于为 MRJob/pySpark 中的每个 SparkStep 创建新的 SparkContext的主要内容,如果未能解决你的问题,请参考以下文章

Spark- Spar架构原理

在执行spar-sql程序中报错:java.lang.NoSuchMethodError: org.apache.spark.internal.Logging.$init$(Lorg/apache/s

070 DStream中的transform和foreachRDD函数

在使用 Scala 中的 Spark 写入 JSON 格式之前,在每行前面添加一个新行

r:将 matrix.csr 转换为矩阵。 as.vector(data) 中的错误

Zeppelin:如何在 zeppelin 中重新启动 sparkContext