如何删除或覆盖添加到 pyspark 作业的文件?
Posted
技术标签:
【中文标题】如何删除或覆盖添加到 pyspark 作业的文件?【英文标题】:How to remove or override files that are added to pyspark job? 【发布时间】:2016-11-09 20:17:16 【问题描述】:我通过使用向 pyspark 上下文添加了一个 egg 文件
sc.addPyFile('/path/to/my_file.egg')
但是,如果我进行了一些更改并重建了我的 egg 文件。我不能再添加它。 Spark说文件已经存在,我不能再添加了。这里是stacktrace
org.apache.spark.SparkException: File /tmp/spark-ddfc2b0f-2897-4fac-8cf3-d7ccee04700c/userFiles-44152f58-835a-4d9f-acd6-f841468fa2cb/my_file.egg exists and does not match contents of file:///path/to/my_file.egg
at org.apache.spark.util.Utils$.copyFile(Utils.scala:489)
at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:595)
at org.apache.spark.util.Utils$.fetchFile(Utils.scala:394)
at org.apache.spark.SparkContext.addFile(SparkContext.scala:1409)
有没有办法告诉 spark 覆盖它?
谢谢,
【问题讨论】:
我也遇到了这个问题。看起来sc.clearFiles()
在版本 2 中是 removed。
也不适合我。我正在使用 Spark 1.6
【参考方案1】:
这可能取决于您的用例,但这并非万无一失!
spark = SparkSession.builder.appName('file_test').config("spark.files.overwrite", "true").getOrCreate()
# Adding spark.files.overwrite is actually letting Spark know that files added via sparkContext can be overwritten.
# user_file_path is where your file resides on the file system
spark.sparkContext.addPyFile(user_file_path)
# change the file and add again
spark.sparkContext.addPyFile(user_file_path)
虽然这在大多数情况下都有效,但我有时会遇到这种方法的问题。我正在动态添加 Python 文件来测试一些 UDF,我需要自动生成一些 Python 方法。当我使用同一个文件覆盖内容并添加到 Spark Py 文件时遇到了一些问题,看起来 Spark 在传播到所有执行程序之前会进行某种内容更改检查,并且似乎存在一些问题。我通过在每次内容更改时创建一个新文件来解决这个问题,它对我有用,因为我不关心我添加了多少文件。
【讨论】:
【参考方案2】:spark._sc.addPyFiles("some_file.py")
print(pyspark.SparkFiles.get("some_file.py") # expect: /tmp/xxx/xxx/xxx1/some_file.py
spark._sc.addPyFiles("some_file.py")
print(pyspark.SparkFiles.get("some_file.py") # expect: /tmp/xxx/xxx/xxx1/some_file.py
spark._sc.addPyFiles("some_file.py")
print(pyspark.SparkFiles.get("some_file.py")
# expect: /tmp/xxx/xxx/xxx1/some_file.py
spark._sc.stop()
spark._sc = spark.sparkContext
spark._sc.addPyFiles("some_file.py")
print(pyspark.SparkFiles.get("some_file.py")
# expect: /tmp/xxx/xxx/xxx2/some_file.py
重置sparkSession的_sc可以重新上传文件到executor,比重启SparkSession更快
【讨论】:
【参考方案3】:我可以删除(或覆盖)通过 sc.addPyiFiles() 添加的文件的唯一方法是重新启动 pyspark 解释器。
【讨论】:
以上是关于如何删除或覆盖添加到 pyspark 作业的文件?的主要内容,如果未能解决你的问题,请参考以下文章
Pyspark:如何在 Yarn 集群上运行作业时对多个文件使用 --files 标签
如何使用 Python 或 Pyspark 或 scala 在数据块中获取笔记本的作业运行结果日志
您如何使用 boto3(或其他方式)在 emr 上自动化 pyspark 作业?
使用 pyspark 提交作业时,如何使用 --files 参数访问静态文件上传?