如何删除或覆盖添加到 pyspark 作业的文件?

Posted

技术标签:

【中文标题】如何删除或覆盖添加到 pyspark 作业的文件?【英文标题】:How to remove or override files that are added to pyspark job? 【发布时间】:2016-11-09 20:17:16 【问题描述】:

我通过使用向 pyspark 上下文添加了一个 egg 文件

sc.addPyFile('/path/to/my_file.egg')

但是,如果我进行了一些更改并重建了我的 egg 文件。我不能再添加它。 Spark说文件已经存在,我不能再添加了。这里是stacktrace

org.apache.spark.SparkException: File /tmp/spark-ddfc2b0f-2897-4fac-8cf3-d7ccee04700c/userFiles-44152f58-835a-4d9f-acd6-f841468fa2cb/my_file.egg exists and does not match contents of file:///path/to/my_file.egg
    at org.apache.spark.util.Utils$.copyFile(Utils.scala:489)
    at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:595)
    at org.apache.spark.util.Utils$.fetchFile(Utils.scala:394)
    at org.apache.spark.SparkContext.addFile(SparkContext.scala:1409)

有没有办法告诉 spark 覆盖它?

谢谢,

【问题讨论】:

我也遇到了这个问题。看起来 sc.clearFiles() 在版本 2 中是 removed。 也不适合我。我正在使用 Spark 1.6 【参考方案1】:

这可能取决于您的用例,但这并非万无一失!

spark = SparkSession.builder.appName('file_test').config("spark.files.overwrite", "true").getOrCreate()
# Adding spark.files.overwrite is actually letting Spark know that files added via sparkContext can be overwritten.
# user_file_path is where your file resides on the file system
spark.sparkContext.addPyFile(user_file_path)
# change the file and add again
spark.sparkContext.addPyFile(user_file_path)

虽然这在大多数情况下都有效,但我有时会遇到这种方法的问题。我正在动态添加 Python 文件来测试一些 UDF,我需要自动生成一些 Python 方法。当我使用同一个文件覆盖内容并添加到 Spark Py 文件时遇到了一些问题,看起来 Spark 在传播到所有执行程序之前会进行某种内容更改检查,并且似乎存在一些问题。我通过在每次内容更改时创建一个新文件来解决这个问题,它对我有用,因为我不关心我添加了多少文件。

【讨论】:

【参考方案2】:
spark._sc.addPyFiles("some_file.py")
print(pyspark.SparkFiles.get("some_file.py") # expect: /tmp/xxx/xxx/xxx1/some_file.py
spark._sc.addPyFiles("some_file.py")
print(pyspark.SparkFiles.get("some_file.py") # expect: /tmp/xxx/xxx/xxx1/some_file.py

spark._sc.addPyFiles("some_file.py")
print(pyspark.SparkFiles.get("some_file.py") 
# expect: /tmp/xxx/xxx/xxx1/some_file.py
spark._sc.stop()
spark._sc = spark.sparkContext
spark._sc.addPyFiles("some_file.py")
print(pyspark.SparkFiles.get("some_file.py") 
# expect: /tmp/xxx/xxx/xxx2/some_file.py

重置sparkSession的_sc可以重新上传文件到executor,比重启SparkSession更快

【讨论】:

【参考方案3】:

我可以删除(或覆盖)通过 sc.addPyiFiles() 添加的文件的唯一方法是重新启动 pyspark 解释器。

【讨论】:

以上是关于如何删除或覆盖添加到 pyspark 作业的文件?的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark:如何在 Yarn 集群上运行作业时对多个文件使用 --files 标签

如何使用 Python 或 Pyspark 或 scala 在数据块中获取笔记本的作业运行结果日志

您如何使用 boto3(或其他方式)在 emr 上自动化 pyspark 作业?

使用 pyspark 提交作业时,如何使用 --files 参数访问静态文件上传?

Pyspark 基于另一个类似的数据框添加或删除数据框中的行

pyspark将列添加到列表中已经不存在的数据框