如何从 Apache Spark 访问 s3a:// 文件?

Posted

技术标签:

【中文标题】如何从 Apache Spark 访问 s3a:// 文件?【英文标题】:How to access s3a:// files from Apache Spark? 【发布时间】:2015-08-03 20:15:11 【问题描述】:

Hadoop 2.6 不支持开箱即用的 s3a,因此我尝试了一系列解决方案和修复,包括:

使用 hadoop-aws 和 aws-java-sdk 部署 => 无法读取凭据的环境变量 将 hadoop-aws 添加到 maven => 各种传递依赖冲突

有没有人成功地使两者都工作?

【问题讨论】:

您使用的是哪个版本的 Apache Spark? 相关:SPARK-7442 1.3.1_scala 2.10.4_hadoop 2.6。我刚刚发现 s3:// 和 s3n:// 也不能开箱即用(它们只适用于 hadoop 2.4) 【参考方案1】:

亲身体验了 s3a 和 s3n 之间的差异 - 在 s3a 上传输的 7.9GB 数据大约需要 7 分钟,而在 s3n 上传输 7.9GB 数据需要 73 分钟 [us-east-1 到 us-west-1 不幸的是两种情况; Redshift 和 Lambda 在这个时候是 us-east-1] 这是堆栈中非常重要的一部分,要正确,值得沮丧。

以下是截至 2015 年 12 月的关键部分:

    您的 Spark 集群需要 Hadoop 2.x 或更高版本。如果您使用 Spark EC2 设置脚本并且可能错过了它,则使用 1.0 以外的其他内容的切换是指定 --hadoop-major-version 2(在撰写本文时使用 CDH 4.2)。

    您需要为 2.7.1(稳定版)的 Hadoop 版本添加乍一看似乎已过时的 AWS 开发工具包库(2014 年构建的版本 1.7.4):aws-java -SDK 1.7.4。据我所知,将它与 1.10.8 的特定 AWS 开发工具包 JAR 一起使用并没有破坏任何东西。

    您还需要类路径中的 hadoop-aws 2.7.1 JAR。此 JAR 包含类 org.apache.hadoop.fs.s3a.S3AFileSystem

    spark.properties 中,您可能需要一些如下所示的设置:

    spark.hadoop.fs.s3a.access.key=ACCESSKEY spark.hadoop.fs.s3a.secret.key=SECRETKEY

    如果您使用带有 spark 的 hadoop 2.7 版本,则 aws 客户端使用 V2 作为默认身份验证签名。并且所有新的 aws 区域仅支持 V4 协议。要使用 V4,请在 spark-submit 中传递这些 conf,还必须指定端点(格式 - s3.<region>.amazonaws.com)。

--conf "spark.executor.extraJavaOptions=-Dcom.amazonaws.services.s3.enableV4=true

--conf "spark.driver.extraJavaOptions=-Dcom.amazonaws.services.s3.enableV4=true

我已经在post I wrote 上更详细地详细说明了这个列表,因为我正在努力完成这个过程。此外,我还介绍了我在此过程中遇到的所有异常情况,以及我认为是什么原因以及如何解决它们。

【讨论】:

这对我很有帮助。我最终添加的唯一依赖项是 mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws/… 的 "org.apache.hadoop" % "hadoop-aws" % "3.0.0-alpha2" 在类路径上有hadoop-aws 2.7.1(或更高版本)JAR 为我解决了这个问题,但是在 Amazon EMR 上运行时我不需要这个,所以我将它作为提供的依赖项,我的 sbt 看起来像 @ 987654331@ 抱歉没有看到重大更新,因为这是一个非常古老的问题,尽管没有验证自己,但标记为已接受【参考方案2】:

我写这个答案是为了在 Hadoop 2.7.3

上使用 S3ASpark 2.0.1 访问文件

复制默认随 Hadoop 提供的 AWS jars(hadoop-aws-2.7.3.jaraws-java-sdk-1.7.4.jar

提示:如果 jar 位置不确定?以特权用户身份运行 find 命令会很有帮助;命令可以是

  find / -name hadoop-aws*.jar
  find / -name aws-java-sdk*.jar

进入 spark 类路径,其中包含所有 spark jars

提示:我们不能直接指向位置(它必须在属性文件中),因为我想为发行版和 Linux 风格提供通用答案。 spark classpath 可以通过下面的 find 命令识别

  find / -name spark-core*.jar

spark-defaults.conf

提示:(大部分会放在/etc/spark/conf/spark-defaults.conf

#make sure jars are added to CLASSPATH
spark.yarn.jars=file://spark/home/dir/jars/*.jar,file://hadoop/install/dir/share/hadoop/tools/lib/*.jar


spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem  
spark.hadoop.fs.s3a.access.key=s3a.access.key 
spark.hadoop.fs.s3a.secret.key=s3a.secret.key 
#you can set above 3 properties in hadoop level `core-site.xml` as well by removing spark prefix.

如果需要,在 spark 提交中包括 --driver-class-path 中的 jars(aws-java-sdkhadoop-aws)。

spark-submit --master yarn \
  --driver-class-path spark/jars/home/dir/aws-java-sdk-1.7.4.jar \
  --driver-class-path spark/jars/home/dir/hadoop-aws-2.7.3.jar \
  other options

注意:

确保 Linux 用户具有读取权限,然后再运行 find 命令防止错误权限被拒绝

【讨论】:

你可以通过 wget central.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.7.3/… wget central.maven.org/maven2/com/amazonaws/aws-java-sdk/1.7.4/…得到这些 Zeppelin Spark 2.2 版,我能够连接到仅支持 V4 签名的 S3 中心区域。设置以下属性以及 mrsrinivas 建议的罐子(工件)。 ``` System.setProperty("com.amazonaws.services.s3.enableV4", "true") val hadoopConf = sc.hadoopConfiguration hadoopConf.set("fs.s3a.impl", "org.apache.hadoop.fs. s3a.S3AFileSystem") hadoopConf.set("fs.s3a.endpoint", "s3.ca-central-1.amazonaws.com") hadoopConf.set("fs.s3a.access.key", accessKey) hadoopConf.set (“fs.s3a.secret.key”,secretKey)```【参考方案3】:

我使用 Spark 1.4.1 预构建的二进制文件和 hadoop 2.6 让它工作 确保将 spark.driver.extraClassPathspark.executor.extraClassPath 都设置为指向两个 jar(hadoop-aws 和 aws-java-sdk) 如果您在集群上运行,请确保您的执行程序可以访问集群上的 jar 文件。

【讨论】:

同样的问题:org.apache.spark.SparkException:作业因阶段失败而中止:阶段 1.0 中的任务 3 失败 4 次,最近一次失败:阶段 1.0 中丢失任务 3.3 (TID 27, 10.122 .113.63): java.io.IOException: No FileSystem for scheme: s3n 如果它是所有 s3 的默认值,请在 $SPARK_HOME/conf/spark-defaults.conf 中添加两个变量。 Ref deploymentzone.com/2015/12/20/s3a-on-spark-on-aws-ec2 是一个很好的来源。【参考方案4】:

我们在 Mesos 中使用 spark 1.6.1,并且在从 spark 写入 S3 时遇到了很多问题。我感谢 cfeduke 的答案。我所做的微小更改是将 maven 坐标添加到 spark-defaults.conf 文件中的 spark.jar 配置。我尝试使用 hadoop-aws:2.7.2 但仍然遇到很多错误,所以我们回到了 2.7.1。以下是 spark-defaults.conf 中对我们有用的更改:

spark.jars.packages             net.java.dev.jets3t:jets3t:0.9.0,com.google.guava:guava:16.0.1,com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.1
spark.hadoop.fs.s3a.access.key  <MY ACCESS KEY>
spark.hadoop.fs.s3a.secret.key  <MY SECRET KEY>
spark.hadoop.fs.s3a.fast.upload true

感谢 cfeduke 抽出宝贵时间撰写您的帖子。很有帮助。

【讨论】:

【参考方案5】:

以下是截至 2016 年 10 月的详细信息,在 Spark 欧盟峰会上公布:Apache Spark and Object Stores。

关键点

由于数据损坏的风险/经验,直接输出提交程序已从 Spark 2.0 中删除。 FileOutputCommitter 上有一些设置可以减少重命名,但不会消除它们 我正在与一些同事一起做一个 O(1) 提交程序,依靠 Apache Dynamo 为我们提供所需的一致性。 要使用 S3a,请正确设置类路径。 在 Hadoop 2.7.z 上; 2.6.x 有一些问题,当时 HADOOP-11571 已解决。 在 SPARK-7481 下有一个 PR,可以将所有内容整合到您自己构建的 spark 发行版中。否则,请让提供二进制文件的人来完成这项工作。 Hadoop 2.8 将增加主要的性能改进HADOOP-11694。

产品放置:HADOOP-11694 的读取性能方面包含在 HDP2.5 中; Spark and S3 documentation 可能会引起人们的兴趣——尤其是调整选项。

【讨论】:

【参考方案6】:

使用 Hadoop 2.6 预构建的 Spark 1.4.1,我可以通过添加来自 Hadoop 的 hadoop-aws 和 aws-java-sdk jar 文件来让 s3a:// 在部署到 Spark Standalone 集群时工作2.7.1 发行版(位于 Hadoop 2.7.1 的 $HADOOP_HOME/share/hadoop/tools/lib 下)到我的 $SPARK_HOME/conf/spark-env.sh 文件中的 SPARK_CLASSPATH 环境变量。

【讨论】:

真的吗?让我在 1.4.1 上再次尝试您的解决方案,因为issues.apache.org/jira/browse/SPARK-7442 仍被标记为“未解决”,所以我没有致力于 s3a。 我试过了,好像缺少其他东西,我一直收到这个错误:org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 2.0 failed 4 次,最近一次失败:在阶段 2.0 中丢失任务 3.3(TID 47、10.122.113.63):java.io.IOException:没有用于方案的文件系统:s3n 哦,这是您的解决方案中的弃用冲突:检测到 |SPARK_CLASSPATH(设置为“$value”)。 |这在 Spark 1.0+ 中已被弃用。 | |请改用:| - ./spark-submit 和 --driver-class-path 来扩充驱动程序类路径 | - spark.executor.extraClassPath 增加执行器类路径 具体来说,即使到 2015 年 12 月 31 日,您也需要使用 2014 年编译的 AWS 开发工具包库:aws-java-sdk 1.7.4;上面这个答案是这个问题上最准确的答案。【参考方案7】:

正如你所说,hadoop 2.6 不支持s3a,最新的spark release 1.6.1 也不支持hadoop 2.7,但是spark 2.0 对hadoop 2.7 和s3a 绝对没有问题。

对于 spark 1.6.x,我们使用 EMR 的 s3 驱动程序做了一些肮脏的修改...您可以查看此文档:https://github.com/zalando/spark-appliance#emrfs-support

如果你还想尝试在 spark 1.6.x 中使用 s3a,请参考这里的答案:https://***.com/a/37487407/5630352

【讨论】:

【参考方案8】:

您还可以使用 spark-defaults.conf 将 S3A 依赖项添加到类路径中。

例子:

spark.driver.extraClassPath     /usr/local/spark/jars/hadoop-aws-2.7.5.jar
spark.executor.extraClassPath   /usr/local/spark/jars/hadoop-aws-2.7.5.jar
spark.driver.extraClassPath     /usr/local/spark/jars/aws-java-sdk-1.7.4.jar
spark.executor.extraClassPath   /usr/local/spark/jars/aws-java-sdk-1.7.4.jar

或者只是:

spark.jars     /usr/local/spark/jars/hadoop-aws-2.7.5.jar,/usr/local/spark/jars/aws-java-sdk-1.7.4.jar

只需确保您的 AWS 开发工具包版本与 Hadoop 版本相匹配。有关此的更多信息,请查看此答案:Unable to access S3 data using Spark 2.2

【讨论】:

【参考方案9】:

这是 pyspark 的解决方案(可能带有代理):

def _configure_s3_protocol(spark, proxy=props["proxy"]["host"], port=props["proxy"]["port"], endpoint=props["s3endpoint"]["irland"]):
    """
    Configure access to the protocol s3
    https://sparkour.urizone.net/recipes/using-s3/
    AWS Regions and Endpoints
    https://docs.aws.amazon.com/general/latest/gr/rande.html
    """
    sc = spark.sparkContext
    sc._jsc.hadoopConfiguration().set("fs.s3a.access.key",  os.environ.get("AWS_ACCESS_KEY_ID"))
    sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", os.environ.get("AWS_SECRET_ACCESS_KEY"))
    sc._jsc.hadoopConfiguration().set("fs.s3a.proxy.host", proxy)
    sc._jsc.hadoopConfiguration().set("fs.s3a.proxy.port", port)
    sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", endpoint)
    return spark

【讨论】:

props 变量是什么? 一个普通的python字典【参考方案10】:

这是一个 scala 版本,可以与 Hadoop 3.3.1Spark 3.2.1(预构建)一起正常工作,访问来自非 AWS 机器的 S3 存储桶[通常是开发人员机器上的本地设置]

sbt

 libraryDependencies ++= Seq(
    "org.apache.spark" %% "spark-core" % "3.2.1" % "provided",
    "org.apache.spark" %% "spark-streaming" % "3.2.1" % "provided",
    "org.apache.spark" %% "spark-sql" % "3.2.1" % "provided",
    "org.apache.hadoop" % "hadoop-aws" % "3.3.1",
    "org.apache.hadoop" % "hadoop-common" % "3.3.1" % "provided"
  )

火花程序

  val spark = SparkSession
    .builder()
    .master("local")
    .appName("Process parquet file")
    .config("spark.hadoop.fs.s3a.path.style.access", true)
    .config("spark.hadoop.fs.s3a.access.key", ACCESS_KEY)
    .config("spark.hadoop.fs.s3a.secret.key", SECRET_KEY)
    .config("spark.hadoop.fs.s3a.endpoint", ENDPOINT)
    .config(
      "spark.hadoop.fs.s3a.impl",
      "org.apache.hadoop.fs.s3a.S3AFileSystem"
    )
    // The enable V4 does not seem necessary for the eu-west-3 region
    // see @stevel comment below 
    // .config("com.amazonaws.services.s3.enableV4", true)
    // .config(
    //  "spark.driver.extraJavaOptions",
    //  "-Dcom.amazonaws.services.s3.enableV4=true"
    // )
    .config("spark.executor.instances", "4")
    .getOrCreate()

  spark.sparkContext.setLogLevel("ERROR")

  val df = spark.read.parquet("s3a://[BUCKET NAME]/.../???.parquet")
  df.show()

注意:区域格式为s3.[REGION].amazonaws.com,例如s3.eu-west-3.amazonaws.com

s3 配置

要使存储桶在 AWS 外部可用,请添加以下形式的存储桶策略:


    "Version": "2012-10-17",
    "Statement": [
        
            "Sid": "Statement1",
            "Effect": "Allow",
            "Principal": 
                "AWS": "arn:aws:iam::[ACCOUNT ID]:user/[IAM USERNAME]"
            ,
            "Action": [
                "s3:Delete*",
                "s3:Get*",
                "s3:List*",
                "s3:PutObject"
            ],
            "Resource": "arn:aws:s3:::[BUCKET NAME]/*"
        
    ]

提供给 spark 配置的 ACCESS_KEY 和 SECRET_KEY 必须是存储桶上配置的 IAM 用户的那些

【讨论】:

看起来不错,但您不需要 - fs.s3a.impl 声明。这是仅在堆栈溢出示例中发现的迷信 - com.amazonaws.services.s3.enableV4=true sysprop。这仅适用于较旧的 aws sdk 版本。 @stevel 谢谢 - 然后将编辑答案【参考方案11】:

我使用的是 spark 2.3 版,当我使用 spark 保存数据集时:

dataset.write().format("hive").option("fileFormat", "orc").mode(SaveMode.Overwrite)
    .option("path", "s3://reporting/default/temp/job_application")
    .saveAsTable("job_application");

它完美运行并将我的数据保存到 s3 中。

【讨论】:

如果您使用的是“s3”,那么您使用的是 Amazon EMR,因此不相关。在没有失败和可观察到的不一致的情况下,它对您有用。您不能依赖于生产中的工作,因此 Hadoop 3.1 的 S3A 提交者

以上是关于如何从 Apache Spark 访问 s3a:// 文件?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Spark Submit 中将 s3a 与 Apache spark 2.2(hadoop 2.8) 一起使用?

Apache Spark Hadoop S3A SignatureDoesNotMatch

Spark + s3 - 错误 - java.lang.ClassNotFoundException:找不到类 org.apache.hadoop.fs.s3a.S3AFileSystem

Apache Spark s3a 提交者 - 线程堆栈 - 内存不足问题

使用 Spark 访问 s3a 时出现 403 错误

PySpark:AWS s3n 正在工作,但 s3a 没有