无法使用本地 PySpark 从 S3 读取 json 文件
Posted
技术标签:
【中文标题】无法使用本地 PySpark 从 S3 读取 json 文件【英文标题】:Unable to read json file from S3 using local PySpark 【发布时间】:2021-07-10 20:39:09 【问题描述】:我正在尝试在本地使用 PySpark 从 S3 读取 json 文件。这是代码
import os
import configparser
from pyspark.sql import SparkSession
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages "org.apache.hadoop:hadoop-aws:2.7.3" pyspark-shell'
config = configparser.ConfigParser()
config.read(os.path.expanduser("~/.aws/credentials"))
access_key = config.get("default", "aws_access_key_id")
secret_key = config.get("default", "aws_secret_access_key")
session_token = config.get("default", "aws_session_token")
if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("ReadGoogleTrendsData") \
.master("local[1]") \
.getOrCreate()
sc=spark.sparkContext
# hadoop_conf=sc._jsc.hadoopConfiguration()
# hadoop_conf.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
# hadoop_conf.set("fs.s3n.awsAccessKeyId", access_key)
# hadoop_conf.set("fs.s3n.awsSecretAccessKey", secret_key)
# hadoop_conf.set("fs.s3n.awsSessionToken", session_token)
hadoop_conf=sc._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.endpoint", "s3.amazonaws.com")
hadoop_conf.set("fs.s3a.access.key", access_key)
hadoop_conf.set("fs.s3a.secret.key", secret_key)
hadoop_conf.set("fs.s3a.session.token", session_token)
hadoop_conf.set("fs.s3a.connection.ssl.enabled", "true")
hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
df = spark.read.json("s3a://gamma-rp/trends-data/trends-2021-07-01.json")
df.printSchema()
df.show(5)
运行时出现以下错误
File "/Users/rppatwa/Downloads/Spark/spark-2.4.5-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o37.json.
: com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 403, AWS Service: Amazon S3, AWS Request ID: NB1WXDBQJ5S848AM, AWS Error Code: null, AWS Error Message: Forbidden, S3 Extended Request ID: HeI/luU5fMpN1dehMo9+yDzMAXin2j7AZPo3STRqbvx56rDcUotMdze08cJz08s7P581ATdRmck=
at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:976)
at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:956)
at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:892)
at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:77)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:557)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:545)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.immutable.List.flatMap(List.scala:355)
at org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:545)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:359)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:392)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:834)
我查看了这个错误,似乎我确实拥有存储桶的权限,因为我可以使用 AWS cli cp 命令将文件下载到我的本地计算机。 你能告诉我我在这里缺少什么吗?非常感谢!
【问题讨论】:
如果以下答案有帮助请告诉我 对不起,它没有 - 因为我不确定哪个配置导致 com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 403 以上。我没有看到特定配置,因为您的代码可以工作而不是我的。我正在使用 AWS STS 使用临时凭证。 sessionToken 有什么用?你怎么知道的?可能是该令牌没有特权或不是有效的令牌。或类似的东西。为什么需要会话令牌? 我尝试删除会话令牌,但仍然得到相同的错误 你能打印出你从这些access_key = config.get("default", "aws_access_key_id")
和secret_key = config.get("default", "aws_secret_access_key")
得到的值是多少
【参考方案1】:
看来问题出在您的配置上。我之前也遇到过类似的问题,但我有以下工作成功地保存了从 S3 存储桶获取数据的 abd。
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, TimestampType
spark = SparkSession \
.builder \
.appName("SparkExample") \
.getOrCreate()
spark_context = spark.sparkContext
spark_context._jsc.hadoopConfiguration().set("fs.s3a.access.key", '<KEY>')
spark_context._jsc.hadoopConfiguration().set("fs.s3a.secret.key", 'SECRET_KEY')
spark.conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MILLIS")
schema = StructType().add("_id", StringType()) \
.add("employer", StringType()) \
.add("created_at", TimestampType()) \
.add("name", StringType())
employees = ['_id': 1,
'employer': 'Microsoft',
'created_at': datetime.now(),
'name': 'Noel'
,
'_id': 2,
'employer': 'Apple',
'created_at': datetime.now(),
'name': 'Steve'
,
]
df = spark.createDataFrame(employees, schema=schema)
df.write \
.format("json") \
.mode("append") \
.save("s3a://<YOUR BUCKET>/employeesjson")
collect = spark.read.format("json").load(
"s3a://<YOUR BUCKET>/employeesjson").collect()
print(len(collect))
【讨论】:
【参考方案2】:我能够通过将 spark 从使用 hadoop 2.7 更新到 hadoop 3.2 来解决此问题
【讨论】:
以上是关于无法使用本地 PySpark 从 S3 读取 json 文件的主要内容,如果未能解决你的问题,请参考以下文章
无法从 S3 读取 csv 到 AWS 上 EC2 实例上的 pyspark 数据帧
无法使用 Pyspark 2.4.4 读取 s3 存储桶中的镶木地板文件
使用 pyspark 从 s3 位置读取镶木地板文件的文件夹到 pyspark 数据帧
使用 pyspark 从 S3 服务器读取时出错:[java.lang.IllegalArgumentException]