无法使用本地 PySpark 从 S3 读取 json 文件

Posted

技术标签:

【中文标题】无法使用本地 PySpark 从 S3 读取 json 文件【英文标题】:Unable to read json file from S3 using local PySpark 【发布时间】:2021-07-10 20:39:09 【问题描述】:

我正在尝试在本地使用 PySpark 从 S3 读取 json 文件。这是代码

import os

import configparser
from pyspark.sql import SparkSession

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages "org.apache.hadoop:hadoop-aws:2.7.3" pyspark-shell'

config = configparser.ConfigParser()
config.read(os.path.expanduser("~/.aws/credentials"))
access_key = config.get("default", "aws_access_key_id")
secret_key = config.get("default", "aws_secret_access_key")
session_token = config.get("default", "aws_session_token")

if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .appName("ReadGoogleTrendsData") \
        .master("local[1]") \
        .getOrCreate()

    sc=spark.sparkContext

    # hadoop_conf=sc._jsc.hadoopConfiguration()
    # hadoop_conf.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
    # hadoop_conf.set("fs.s3n.awsAccessKeyId", access_key)
    # hadoop_conf.set("fs.s3n.awsSecretAccessKey", secret_key)
    # hadoop_conf.set("fs.s3n.awsSessionToken", session_token)

    hadoop_conf=sc._jsc.hadoopConfiguration()
    hadoop_conf.set("fs.s3a.endpoint", "s3.amazonaws.com")
    hadoop_conf.set("fs.s3a.access.key", access_key)
    hadoop_conf.set("fs.s3a.secret.key", secret_key)
    hadoop_conf.set("fs.s3a.session.token", session_token)
    hadoop_conf.set("fs.s3a.connection.ssl.enabled", "true")
    hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");

    df = spark.read.json("s3a://gamma-rp/trends-data/trends-2021-07-01.json")
    df.printSchema()
    df.show(5)

运行时出现以下错误

File "/Users/rppatwa/Downloads/Spark/spark-2.4.5-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o37.json.
: com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 403, AWS Service: Amazon S3, AWS Request ID: NB1WXDBQJ5S848AM, AWS Error Code: null, AWS Error Message: Forbidden, S3 Extended Request ID: HeI/luU5fMpN1dehMo9+yDzMAXin2j7AZPo3STRqbvx56rDcUotMdze08cJz08s7P581ATdRmck=
    at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
    at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
    at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:976)
    at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:956)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:892)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:77)
    at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:557)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:545)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at scala.collection.immutable.List.flatMap(List.scala:355)
    at org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:545)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:359)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
    at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:392)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.base/java.lang.Thread.run(Thread.java:834)

我查看了这个错误,似乎我确实拥有存储桶的权限,因为我可以使用 AWS cli cp 命令将文件下载到我的本地计算机。 你能告诉我我在这里缺少什么吗?非常感谢!

【问题讨论】:

如果以下答案有帮助请告诉我 对不起,它没有 - 因为我不确定哪个配置导致 com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 403 以上。我没有看到特定配置,因为您的代码可以工作而不是我的。我正在使用 AWS STS 使用临时凭证。 sessionToken 有什么用?你怎么知道的?可能是该令牌没有特权或不是有效的令牌。或类似的东西。为什么需要会话令牌? 我尝试删除会话令牌,但仍然得到相同的错误 你能打印出你从这些access_key = config.get("default", "aws_access_key_id")secret_key = config.get("default", "aws_secret_access_key")得到的值是多少 【参考方案1】:

看来问题出在您的配置上。我之前也遇到过类似的问题,但我有以下工作成功地保存了从 S3 存储桶获取数据的 abd。

from datetime import datetime

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, TimestampType

spark = SparkSession \
    .builder \
    .appName("SparkExample") \
    .getOrCreate()

spark_context = spark.sparkContext
spark_context._jsc.hadoopConfiguration().set("fs.s3a.access.key", '<KEY>')
spark_context._jsc.hadoopConfiguration().set("fs.s3a.secret.key", 'SECRET_KEY')
spark.conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MILLIS")

schema = StructType().add("_id", StringType()) \
    .add("employer", StringType()) \
    .add("created_at", TimestampType()) \
    .add("name", StringType())


employees = ['_id': 1,
              'employer': 'Microsoft',
              'created_at': datetime.now(),
              'name': 'Noel'
              ,
             '_id': 2,
              'employer': 'Apple',
              'created_at': datetime.now(),
              'name': 'Steve'
              ,
             ]
df = spark.createDataFrame(employees, schema=schema)

df.write \
    .format("json") \
    .mode("append") \
    .save("s3a://<YOUR BUCKET>/employeesjson")

collect = spark.read.format("json").load(
    "s3a://<YOUR BUCKET>/employeesjson").collect()
print(len(collect))

【讨论】:

【参考方案2】:

我能够通过将 spark 从使用 hadoop 2.7 更新到 hadoop 3.2 来解决此问题

【讨论】:

以上是关于无法使用本地 PySpark 从 S3 读取 json 文件的主要内容,如果未能解决你的问题,请参考以下文章

无法从 S3 读取 csv 到 AWS 上 EC2 实例上的 pyspark 数据帧

无法使用 Pyspark 2.4.4 读取 s3 存储桶中的镶木地板文件

使用 pyspark 从 s3 读取/加载 avro 文件

使用 pyspark 从 s3 位置读取镶木地板文件的文件夹到 pyspark 数据帧

使用 PySpark 从 Amazon S3 读取文本文件

使用 pyspark 从 S3 服务器读取时出错:[java.lang.IllegalArgumentException]