Pyspark 从 S3 存储桶读取 csv 文件:AnalysisException:路径不存在

Posted

技术标签:

【中文标题】Pyspark 从 S3 存储桶读取 csv 文件:AnalysisException:路径不存在【英文标题】:Pyspark read csv file from S3 bucket : AnalysisException: Path does not exist 【发布时间】:2021-10-15 00:10:45 【问题描述】:

在 Google Colab 中,我试图让 PySpark 从 S3 存储桶中读取 csv。

这是我的代码:

# Read in data from S3 Buckets
from pyspark import SparkFiles
url = "https://bucket-name.s3.amazonaws.com/filename.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("filename.csv"), sep=",", header=True)

# Show DataFrame
df.show()

这是我的回报:

---------------------------------------------------------------------------
AnalysisException                         Traceback (most recent call last)
<ipython-input-14-5d0cdc44d2c4> in <module>()
      4 url = "https://bucket-name.s3.amazonaws.com/filename.csv"
      5 spark.sparkContext.addFile(url)
----> 6 df = spark.read.csv(SparkFiles.get("filename.csv"), sep=",", header=True)
      7 
      8 # Show DataFrame

2 frames
/content/spark-3.1.2-bin-hadoop3.2/python/pyspark/sql/utils.py in deco(*a, **kw)
    115                 # Hide where the exception came from that shows a non-Pythonic
    116                 # JVM exception message.
--> 117                 raise converted from None
    118             else:
    119                 raise

AnalysisException: Path does not exist: file:/tmp/spark-d308539f-6371-4081-b6f4-e5f13ca7ed5b/userFiles-05f00260-eb10-4e31-8a5f-3abc12a17149/filename.csv

我正在尝试让它从 S3 存储桶中读取文件。我已启用对存储桶和文件的公共访问权限。

【问题讨论】:

【参考方案1】:

我的做法略有不同:

import boto3
import json
import io


def get_bucket(bucket_name: str):
  """
  Returns the specified bucket 
  :param: bucket_name str the bucket name to return
  :return: The bucket
  """
  s3 = boto3.resource("s3")
  bucket = s3.Bucket(bucket_name)
  return bucket


def read_file(bucket, key, encoding="utf-8") -> str:
  file_obj = io.BytesIO()
  bucket.download_fileobj(key, file_obj)
  wrapper = io.TextIOWrapper(file_obj, encoding=encoding)
  file_obj.seek(0)
  return wrapper.read() 

bucket = get_bucket("myBucket")
file_as_str = read_file(bucket, <KEY>)
csvData = spark.sparkContext.parallelize(io.StringIO(file_as_str))
df = spark.read.option("header", True).option("inferSchema", True).option("sep", ",").csv(csvData)


请注意,&lt;KEY&gt; 是您文件的 S3 密钥。

【讨论】:

以上是关于Pyspark 从 S3 存储桶读取 csv 文件:AnalysisException:路径不存在的主要内容,如果未能解决你的问题,请参考以下文章

从 S3 存储桶中读取大量 CSV 文件

如何从 S3 存储桶中读取最后修改的 csv 文件?

将文件从私有 S3 存储桶读取到 pandas 数据帧

如何从 S3 存储桶中仅读取最近 7 天的 csv 文件

在运行 AWS Glue ETL 作业并命名输出文件名时,有没有办法从 S3 存储桶中读取文件名。 pyspark 是不是提供了一种方法来做到这一点?

无法使用 Pyspark 2.4.4 读取 s3 存储桶中的镶木地板文件