为 MRJob/pySpark 中的每个 SparkStep 创建新的 SparkContext
Posted
技术标签:
【中文标题】为 MRJob/pySpark 中的每个 SparkStep 创建新的 SparkContext【英文标题】:Creating new SparkContext for each SparkStep in MRJob/ pySpark 【发布时间】:2018-04-04 20:35:24 【问题描述】:我是 pySpark 的新手,我正在尝试使用 MRJob 实现多步骤 EMR/Spark 作业,我需要为每个 SparkStep 创建一个新的 SparkContext,还是可以为所有 SparkSteps 共享相同的 SparkContext?
我试图查找MRJob manual,但不幸的是,这并不清楚。
有人可以告诉我正确的方法是什么吗?
创建一个单独的 SparkContext:
class MRSparkJob(MRJob):
def spark_step1(self, input_path, output_path):
from pyspark import SparkContext
sc = SparkContext(appName='appname')
...
sc.stop()
def spark_step2(self, input_path, output_path):
from pyspark import SparkContext
sc = SparkContext(appName='appname')
...
sc.stop()
def steps(self):
return [SparkStep(spark=self.spark_step1),
SparkStep(spark=self.spark_step2)]
if __name__ == '__main__':
MRSparkJob.run()
创建一个 SparkContext 并在不同的 SparkSteps 之间共享它
class MRSparkJob(MRJob):
sc = None
def spark_step1(self, input_path, output_path):
from pyspark import SparkContext
self.sc = SparkContext(appName='appname')
...
def spark_step2(self, input_path, output_path):
from pyspark import SparkContext
... (reuse the same self.sc)
self.sc.stop()
def steps(self):
return [SparkStep(spark=self.spark_step1),
SparkStep(spark=self.spark_step2)]
if __name__ == '__main__':
MRSparkJob.run()
【问题讨论】:
【参考方案1】:根据MRJob discussion group 的 Dave 所说,我们应该为每个步骤创建一个新的 SparkContext,因为每个步骤都是对 Hadoop 和 Spark 的全新调用(即上面的 #1 是正确的方法)。
【讨论】:
以上是关于为 MRJob/pySpark 中的每个 SparkStep 创建新的 SparkContext的主要内容,如果未能解决你的问题,请参考以下文章
在执行spar-sql程序中报错:java.lang.NoSuchMethodError: org.apache.spark.internal.Logging.$init$(Lorg/apache/s
070 DStream中的transform和foreachRDD函数
在使用 Scala 中的 Spark 写入 JSON 格式之前,在每行前面添加一个新行