PySpark Overwrite 添加了 sc.addPyFile

Posted

技术标签:

【中文标题】PySpark Overwrite 添加了 sc.addPyFile【英文标题】:PySpark Overwrite added sc.addPyFile 【发布时间】:2017-06-06 06:05:18 【问题描述】:

我在这个路径下保存了这两个文件:

C:\code\sample1\main.py

def method():
    return "this is sample method 1"

C:\code\sample2\main.py

def method():
    return "this is sample method 2"

然后我运行这个:

from pyspark import SparkContext
from pyspark.sql import SparkSession

sc = SparkContext()
spark = SparkSession(sc)

sc.addPyFile("~/code/sample1/main.py")
main1 = __import__("main")
print(main1.method()) # this is sample method 1

sc.addPyFile("~/code/sample2/main.py") # Error

错误是

Py4JJavaError:调用 o21.addFile 时出错。 : org.apache.spark.SparkException: 文件 C:\Users\hans.yulian\AppData\Local\Temp\spark-5da165cf-410f-4576-8124-0ab23aba6aa3\userFiles-25a7ca23-84fb-42b7-95d9-206867fb9dfd\main .py 存在且与 /C:/Users/hans.yulian/Documents/spark-test/main2/main.py 的内容不匹配

这意味着它的临时文件夹中已经有“main.py”文件并且内容不同。我想知道这种情况是否有任何解决方法,但对我来说,我有这些限制:

    文件名还是要“main.py”,只能是文件夹 不一样的 可以通过某种方式清除临时文件夹以添加 aga 在另一个文件中我唯一的解决方案是附加随机 main.py 前面的字符串,例如 abcdemain.pyfghijmain.py,然后我将导入 main = __import__("abcdemain"), 但这不是真的更可取

【问题讨论】:

【参考方案1】:

虽然技术上可行,但通过将spark.files.overwrite 设置为"true"

from pyspark import SparkConf, SparkContext

sc = SparkContext(conf=SparkConf().set("spark.files.overwrite", "true"))

在简单的情况下会给出正确的结果

def f(*_):                                                                   
    from main import method
    return [method()]

sc.addFile("/path/to/sample1/main.py") 
sc.parallelize([], 3).mapPartitions(f).collect()
['this is sample method 1',
'this is sample method 1',
'this is sample method 1']
sc.addFile("/path/to/sample2/main.py")

sc.parallelize([], 3).mapPartitions(f).collect()
['this is sample method 2',
 'this is sample method 2',
 'this is sample method 2']

它在实践中并不可靠,即使您在每次访问时reload 模块也会使您的应用程序难以推理。由于 Spark 可能会隐式缓存某些对象,或者透明地重新启动 Python 工作者,因此您很容易陷入不同节点看到源的不同状态的情况。

【讨论】:

以上是关于PySpark Overwrite 添加了 sc.addPyFile的主要内容,如果未能解决你的问题,请参考以下文章

如何删除或覆盖添加到 pyspark 作业的文件?

PySpark 评估

PySpark:当列是列表时,将列添加到 DataFrame

Jupyter中的PySpark SparkContext名称错误'sc'

火花塞覆盖测试

向数据框添加列并在 pyspark 中更新