如何从 Apache Spark 访问 s3a:// 文件?
Posted
技术标签:
【中文标题】如何从 Apache Spark 访问 s3a:// 文件?【英文标题】:How to access s3a:// files from Apache Spark? 【发布时间】:2015-08-03 20:15:11 【问题描述】:Hadoop 2.6 不支持开箱即用的 s3a,因此我尝试了一系列解决方案和修复,包括:
使用 hadoop-aws 和 aws-java-sdk 部署 => 无法读取凭据的环境变量 将 hadoop-aws 添加到 maven => 各种传递依赖冲突
有没有人成功地使两者都工作?
【问题讨论】:
您使用的是哪个版本的 Apache Spark? 相关:SPARK-7442 1.3.1_scala 2.10.4_hadoop 2.6。我刚刚发现 s3:// 和 s3n:// 也不能开箱即用(它们只适用于 hadoop 2.4) 【参考方案1】:亲身体验了 s3a 和 s3n 之间的差异 - 在 s3a 上传输的 7.9GB 数据大约需要 7 分钟,而在 s3n 上传输 7.9GB 数据需要 73 分钟 [us-east-1 到 us-west-1 不幸的是两种情况; Redshift 和 Lambda 在这个时候是 us-east-1] 这是堆栈中非常重要的一部分,要正确,值得沮丧。
以下是截至 2015 年 12 月的关键部分:
您的 Spark 集群需要 Hadoop 2.x 或更高版本。如果您使用 Spark EC2 设置脚本并且可能错过了它,则使用 1.0 以外的其他内容的切换是指定 --hadoop-major-version 2
(在撰写本文时使用 CDH 4.2)。
您需要为 2.7.1(稳定版)的 Hadoop 版本添加乍一看似乎已过时的 AWS 开发工具包库(2014 年构建的版本 1.7.4):aws-java -SDK 1.7.4。据我所知,将它与 1.10.8 的特定 AWS 开发工具包 JAR 一起使用并没有破坏任何东西。
您还需要类路径中的 hadoop-aws 2.7.1 JAR。此 JAR 包含类 org.apache.hadoop.fs.s3a.S3AFileSystem
。
在spark.properties
中,您可能需要一些如下所示的设置:
spark.hadoop.fs.s3a.access.key=ACCESSKEY spark.hadoop.fs.s3a.secret.key=SECRETKEY
如果您使用带有 spark 的 hadoop 2.7 版本,则 aws 客户端使用 V2 作为默认身份验证签名。并且所有新的 aws 区域仅支持 V4 协议。要使用 V4,请在 spark-submit 中传递这些 conf,还必须指定端点(格式 - s3.<region>.amazonaws.com
)。
--conf "spark.executor.extraJavaOptions=-Dcom.amazonaws.services.s3.enableV4=true
--conf "spark.driver.extraJavaOptions=-Dcom.amazonaws.services.s3.enableV4=true
我已经在post I wrote 上更详细地详细说明了这个列表,因为我正在努力完成这个过程。此外,我还介绍了我在此过程中遇到的所有异常情况,以及我认为是什么原因以及如何解决它们。
【讨论】:
这对我很有帮助。我最终添加的唯一依赖项是 mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws/… 的 "org.apache.hadoop" % "hadoop-aws" % "3.0.0-alpha2" 在类路径上有hadoop-aws 2.7.1
(或更高版本)JAR 为我解决了这个问题,但是在 Amazon EMR 上运行时我不需要这个,所以我将它作为提供的依赖项,我的 sbt 看起来像 @ 987654331@
抱歉没有看到重大更新,因为这是一个非常古老的问题,尽管没有验证自己,但标记为已接受【参考方案2】:
我写这个答案是为了在 Hadoop 2.7.3
上使用 S3A 从 Spark 2.0.1 访问文件复制默认随 Hadoop 提供的 AWS jars(hadoop-aws-2.7.3.jar
和 aws-java-sdk-1.7.4.jar
)
提示:如果 jar 位置不确定?以特权用户身份运行 find 命令会很有帮助;命令可以是
find / -name hadoop-aws*.jar
find / -name aws-java-sdk*.jar
进入 spark 类路径,其中包含所有 spark jars
提示:我们不能直接指向位置(它必须在属性文件中),因为我想为发行版和 Linux 风格提供通用答案。 spark classpath 可以通过下面的 find 命令识别
find / -name spark-core*.jar
在spark-defaults.conf
提示:(大部分会放在/etc/spark/conf/spark-defaults.conf
)
#make sure jars are added to CLASSPATH
spark.yarn.jars=file://spark/home/dir/jars/*.jar,file://hadoop/install/dir/share/hadoop/tools/lib/*.jar
spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem
spark.hadoop.fs.s3a.access.key=s3a.access.key
spark.hadoop.fs.s3a.secret.key=s3a.secret.key
#you can set above 3 properties in hadoop level `core-site.xml` as well by removing spark prefix.
如果需要,在 spark 提交中包括 --driver-class-path
中的 jars(aws-java-sdk
和 hadoop-aws
)。
spark-submit --master yarn \
--driver-class-path spark/jars/home/dir/aws-java-sdk-1.7.4.jar \
--driver-class-path spark/jars/home/dir/hadoop-aws-2.7.3.jar \
other options
注意:
确保 Linux 用户具有读取权限,然后再运行
find
命令防止错误权限被拒绝
【讨论】:
你可以通过 wget central.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.7.3/… wget central.maven.org/maven2/com/amazonaws/aws-java-sdk/1.7.4/…得到这些 Zeppelin Spark 2.2 版,我能够连接到仅支持 V4 签名的 S3 中心区域。设置以下属性以及 mrsrinivas 建议的罐子(工件)。 ``` System.setProperty("com.amazonaws.services.s3.enableV4", "true") val hadoopConf = sc.hadoopConfiguration hadoopConf.set("fs.s3a.impl", "org.apache.hadoop.fs. s3a.S3AFileSystem") hadoopConf.set("fs.s3a.endpoint", "s3.ca-central-1.amazonaws.com") hadoopConf.set("fs.s3a.access.key", accessKey) hadoopConf.set (“fs.s3a.secret.key”,secretKey)```【参考方案3】:我使用 Spark 1.4.1 预构建的二进制文件和 hadoop 2.6 让它工作
确保将 spark.driver.extraClassPath
和 spark.executor.extraClassPath
都设置为指向两个 jar(hadoop-aws 和 aws-java-sdk)
如果您在集群上运行,请确保您的执行程序可以访问集群上的 jar 文件。
【讨论】:
同样的问题:org.apache.spark.SparkException:作业因阶段失败而中止:阶段 1.0 中的任务 3 失败 4 次,最近一次失败:阶段 1.0 中丢失任务 3.3 (TID 27, 10.122 .113.63): java.io.IOException: No FileSystem for scheme: s3n 如果它是所有 s3 的默认值,请在 $SPARK_HOME/conf/spark-defaults.conf 中添加两个变量。 Ref deploymentzone.com/2015/12/20/s3a-on-spark-on-aws-ec2 是一个很好的来源。【参考方案4】:我们在 Mesos 中使用 spark 1.6.1,并且在从 spark 写入 S3 时遇到了很多问题。我感谢 cfeduke 的答案。我所做的微小更改是将 maven 坐标添加到 spark-defaults.conf 文件中的 spark.jar 配置。我尝试使用 hadoop-aws:2.7.2 但仍然遇到很多错误,所以我们回到了 2.7.1。以下是 spark-defaults.conf 中对我们有用的更改:
spark.jars.packages net.java.dev.jets3t:jets3t:0.9.0,com.google.guava:guava:16.0.1,com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.1
spark.hadoop.fs.s3a.access.key <MY ACCESS KEY>
spark.hadoop.fs.s3a.secret.key <MY SECRET KEY>
spark.hadoop.fs.s3a.fast.upload true
感谢 cfeduke 抽出宝贵时间撰写您的帖子。很有帮助。
【讨论】:
【参考方案5】:以下是截至 2016 年 10 月的详细信息,在 Spark 欧盟峰会上公布:Apache Spark and Object Stores。
关键点
由于数据损坏的风险/经验,直接输出提交程序已从 Spark 2.0 中删除。 FileOutputCommitter 上有一些设置可以减少重命名,但不会消除它们 我正在与一些同事一起做一个 O(1) 提交程序,依靠 Apache Dynamo 为我们提供所需的一致性。 要使用 S3a,请正确设置类路径。 在 Hadoop 2.7.z 上; 2.6.x 有一些问题,当时 HADOOP-11571 已解决。 在 SPARK-7481 下有一个 PR,可以将所有内容整合到您自己构建的 spark 发行版中。否则,请让提供二进制文件的人来完成这项工作。 Hadoop 2.8 将增加主要的性能改进HADOOP-11694。产品放置:HADOOP-11694 的读取性能方面包含在 HDP2.5 中; Spark and S3 documentation 可能会引起人们的兴趣——尤其是调整选项。
【讨论】:
【参考方案6】:使用 Hadoop 2.6 预构建的 Spark 1.4.1,我可以通过添加来自 Hadoop 的 hadoop-aws 和 aws-java-sdk jar 文件来让 s3a:// 在部署到 Spark Standalone 集群时工作2.7.1 发行版(位于 Hadoop 2.7.1 的 $HADOOP_HOME/share/hadoop/tools/lib 下)到我的 $SPARK_HOME/conf/spark-env.sh 文件中的 SPARK_CLASSPATH 环境变量。
【讨论】:
真的吗?让我在 1.4.1 上再次尝试您的解决方案,因为issues.apache.org/jira/browse/SPARK-7442 仍被标记为“未解决”,所以我没有致力于 s3a。 我试过了,好像缺少其他东西,我一直收到这个错误:org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 2.0 failed 4 次,最近一次失败:在阶段 2.0 中丢失任务 3.3(TID 47、10.122.113.63):java.io.IOException:没有用于方案的文件系统:s3n 哦,这是您的解决方案中的弃用冲突:检测到 |SPARK_CLASSPATH(设置为“$value”)。 |这在 Spark 1.0+ 中已被弃用。 | |请改用:| - ./spark-submit 和 --driver-class-path 来扩充驱动程序类路径 | - spark.executor.extraClassPath 增加执行器类路径 具体来说,即使到 2015 年 12 月 31 日,您也需要使用 2014 年编译的 AWS 开发工具包库:aws-java-sdk 1.7.4;上面这个答案是这个问题上最准确的答案。【参考方案7】:正如你所说,hadoop 2.6 不支持s3a,最新的spark release 1.6.1 也不支持hadoop 2.7,但是spark 2.0 对hadoop 2.7 和s3a 绝对没有问题。
对于 spark 1.6.x,我们使用 EMR 的 s3 驱动程序做了一些肮脏的修改...您可以查看此文档:https://github.com/zalando/spark-appliance#emrfs-support
如果你还想尝试在 spark 1.6.x 中使用 s3a,请参考这里的答案:https://***.com/a/37487407/5630352
【讨论】:
【参考方案8】:您还可以使用 spark-defaults.conf
将 S3A 依赖项添加到类路径中。
例子:
spark.driver.extraClassPath /usr/local/spark/jars/hadoop-aws-2.7.5.jar
spark.executor.extraClassPath /usr/local/spark/jars/hadoop-aws-2.7.5.jar
spark.driver.extraClassPath /usr/local/spark/jars/aws-java-sdk-1.7.4.jar
spark.executor.extraClassPath /usr/local/spark/jars/aws-java-sdk-1.7.4.jar
或者只是:
spark.jars /usr/local/spark/jars/hadoop-aws-2.7.5.jar,/usr/local/spark/jars/aws-java-sdk-1.7.4.jar
只需确保您的 AWS 开发工具包版本与 Hadoop 版本相匹配。有关此的更多信息,请查看此答案:Unable to access S3 data using Spark 2.2
【讨论】:
【参考方案9】:这是 pyspark 的解决方案(可能带有代理):
def _configure_s3_protocol(spark, proxy=props["proxy"]["host"], port=props["proxy"]["port"], endpoint=props["s3endpoint"]["irland"]):
"""
Configure access to the protocol s3
https://sparkour.urizone.net/recipes/using-s3/
AWS Regions and Endpoints
https://docs.aws.amazon.com/general/latest/gr/rande.html
"""
sc = spark.sparkContext
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", os.environ.get("AWS_ACCESS_KEY_ID"))
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", os.environ.get("AWS_SECRET_ACCESS_KEY"))
sc._jsc.hadoopConfiguration().set("fs.s3a.proxy.host", proxy)
sc._jsc.hadoopConfiguration().set("fs.s3a.proxy.port", port)
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", endpoint)
return spark
【讨论】:
props
变量是什么?
一个普通的python字典【参考方案10】:
这是一个 scala 版本,可以与 Hadoop 3.3.1 的 Spark 3.2.1(预构建)一起正常工作,访问来自非 AWS 机器的 S3 存储桶[通常是开发人员机器上的本地设置]
sbt
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "3.2.1" % "provided",
"org.apache.spark" %% "spark-streaming" % "3.2.1" % "provided",
"org.apache.spark" %% "spark-sql" % "3.2.1" % "provided",
"org.apache.hadoop" % "hadoop-aws" % "3.3.1",
"org.apache.hadoop" % "hadoop-common" % "3.3.1" % "provided"
)
火花程序
val spark = SparkSession
.builder()
.master("local")
.appName("Process parquet file")
.config("spark.hadoop.fs.s3a.path.style.access", true)
.config("spark.hadoop.fs.s3a.access.key", ACCESS_KEY)
.config("spark.hadoop.fs.s3a.secret.key", SECRET_KEY)
.config("spark.hadoop.fs.s3a.endpoint", ENDPOINT)
.config(
"spark.hadoop.fs.s3a.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem"
)
// The enable V4 does not seem necessary for the eu-west-3 region
// see @stevel comment below
// .config("com.amazonaws.services.s3.enableV4", true)
// .config(
// "spark.driver.extraJavaOptions",
// "-Dcom.amazonaws.services.s3.enableV4=true"
// )
.config("spark.executor.instances", "4")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
val df = spark.read.parquet("s3a://[BUCKET NAME]/.../???.parquet")
df.show()
注意:区域格式为s3.[REGION].amazonaws.com
,例如s3.eu-west-3.amazonaws.com
s3 配置
要使存储桶在 AWS 外部可用,请添加以下形式的存储桶策略:
"Version": "2012-10-17",
"Statement": [
"Sid": "Statement1",
"Effect": "Allow",
"Principal":
"AWS": "arn:aws:iam::[ACCOUNT ID]:user/[IAM USERNAME]"
,
"Action": [
"s3:Delete*",
"s3:Get*",
"s3:List*",
"s3:PutObject"
],
"Resource": "arn:aws:s3:::[BUCKET NAME]/*"
]
提供给 spark 配置的 ACCESS_KEY 和 SECRET_KEY 必须是存储桶上配置的 IAM 用户的那些
【讨论】:
看起来不错,但您不需要 - fs.s3a.impl 声明。这是仅在堆栈溢出示例中发现的迷信 - com.amazonaws.services.s3.enableV4=true sysprop。这仅适用于较旧的 aws sdk 版本。 @stevel 谢谢 - 然后将编辑答案【参考方案11】:我使用的是 spark 2.3 版,当我使用 spark 保存数据集时:
dataset.write().format("hive").option("fileFormat", "orc").mode(SaveMode.Overwrite)
.option("path", "s3://reporting/default/temp/job_application")
.saveAsTable("job_application");
它完美运行并将我的数据保存到 s3 中。
【讨论】:
如果您使用的是“s3”,那么您使用的是 Amazon EMR,因此不相关。在没有失败和可观察到的不一致的情况下,它对您有用。您不能依赖于生产中的工作,因此 Hadoop 3.1 的 S3A 提交者以上是关于如何从 Apache Spark 访问 s3a:// 文件?的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Spark Submit 中将 s3a 与 Apache spark 2.2(hadoop 2.8) 一起使用?
Apache Spark Hadoop S3A SignatureDoesNotMatch
Spark + s3 - 错误 - java.lang.ClassNotFoundException:找不到类 org.apache.hadoop.fs.s3a.S3AFileSystem