无法从 S3 读取 csv 到 AWS 上 EC2 实例上的 pyspark 数据帧
Posted
技术标签:
【中文标题】无法从 S3 读取 csv 到 AWS 上 EC2 实例上的 pyspark 数据帧【英文标题】:Can't read csv from S3 to pyspark dataframe on a EC2 instance on AWS 【发布时间】:2020-08-19 20:15:45 【问题描述】:我无法将 csv 文件从 S3 读取到 AWS 云上 EC2 实例上的 pyspark 数据帧。 我使用 Flintrock 在 AWS 上创建了一个 spark 集群。 这是我的 Flintrock 配置文件(在本地机器上):
services:
spark:
version: 3.0.0
hdfs:
version: 2.7.3
provider: ec2
providers:
ec2:
key-name: xxxx
identity-file: /home/yyyy/keys/xxxx.pem
instance-type: t2.micro
region: us-east-1
ami: ami-02354e95b39ca8dec
user: ec2-user
launch:
num-slaves: 1
install-hdfs: False
然后我在AWS上启动集群如下:
flintrock launch mysparkcluster
集群已创建并且似乎可以工作。 然后我安装python3如下:
flintrock run-command mysparkcluster 'sudo yum install -y python3'
然后我登录到主节点:
flintrock login mysparkcluster
然后我做:
export PYSPARK_PYTHON=/usr/bin/python3
然后我启动 pyspark shell(到目前为止它可以工作!):
pyspark --master spark://0.0.0.0:7077 --packages org.apache.hadoop:hadoop-aws:2.7.4
在下面的 pyspark shell 中,我设置了所需的凭据。由于我使用的是 aws Education 帐户,因此我的理解是我只能获得临时会话,除了 access-key-id 和密钥之外,我还需要会话令牌:
from pyspark.sql import SQLContext
sqlc = SQLContext(sc)
spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", "KEYXYZ")
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "SECRETXYZ")
spark._jsc.hadoopConfiguration().set("fs.s3a.session.token", "VERYLONGTOKEN")
spark._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider","org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider")
spark._jsc.hadoopConfiguration().set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3.amazonaws.com")
然后我尝试按如下方式读取 csv 文件:
df = sqlc.read.csv('s3a://mybucket/myfile.csv', header='true', inferSchema='true')
我收到以下错误:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/home/ec2-user/spark/python/pyspark/sql/readwriter.py", line 535, in csv
return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
File "/home/ec2-user/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
File "/home/ec2-user/spark/python/pyspark/sql/utils.py", line 131, in deco
return f(*a, **kw)
File "/home/ec2-user/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o51.csv.
: com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 403, AWS Service: Amazon S3, AWS Request ID: EEAD03F2F4012750, AWS Error Code: null, AWS Error Message: Forbidden, S3 Extended Request ID: mi9O78oh2QbtklTCrCQkv6SuPFR0UR6zl5CB4kuHTCJD7mdNrA6s5R8oejWJ0MAlAS8zOPJY7FY=
at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:976)
at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:956)
at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:892)
at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:77)
at org.apache.hadoop.fs.FileSystem.isDirectory(FileSystem.java:1439)
at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:47)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:361)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:279)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:268)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:268)
at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:705)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
我做错了什么?
提前感谢您的提示!
【问题讨论】:
【参考方案1】:我在 python 代码中通过 hadoopConfiguration().set() 提供凭据的方式可能是错误的。但是还有另一种方法可以配置 flintrock(以及更普遍的 EC2 实例)以能够访问 S3 而无需在代码中提供凭证(这实际上是在处理来自 AWS 的临时凭证时这样做的推荐方式)。以下帮助:
flintrock docu,表示“设置授予 根据需要访问 S3。启动时引用此角色 使用 --ec2-instance-profile-name 选项(或其 config.yaml 文件中的等效项)。” This AWS documentation page 一步一步解释如何做。 Another useful AWS docu page. 请注意:如果您通过 AWS 创建上述角色 控制台然后具有相同名称的相应实例配置文件是 自动创建,否则(如果您使用 awscli 或 AWS API)您 必须手动创建所需的实例配置文件作为额外的 步骤。【讨论】:
以上是关于无法从 S3 读取 csv 到 AWS 上 EC2 实例上的 pyspark 数据帧的主要内容,如果未能解决你的问题,请参考以下文章
PySpark - Spark 集群 EC2 - 无法保存到 S3
当我们从 s3 中的 csv 文件读取数据并在 aws athena 中创建表时如何跳过标题。
AWS Glue - 从 sql server 表中读取并作为自定义 CSV 文件写入 S3