将 S3(法兰克福)与 Spark 结合使用
Posted
技术标签:
【中文标题】将 S3(法兰克福)与 Spark 结合使用【英文标题】:Using S3 (Frankfurt) with Spark 【发布时间】:2016-04-15 12:23:31 【问题描述】:有人在法兰克福使用 s3 使用 hadoop/spark 1.6.0 吗?
我正在尝试将作业的结果存储在 s3 上,我的依赖项声明如下:
"org.apache.spark" %% "spark-core" % "1.6.0" exclude("org.apache.hadoop", "hadoop-client"),
"org.apache.spark" %% "spark-sql" % "1.6.0",
"org.apache.hadoop" % "hadoop-client" % "2.7.2",
"org.apache.hadoop" % "hadoop-aws" % "2.7.2"
我设置了以下配置:
System.setProperty("com.amazonaws.services.s3.enableV4", "true")
sc.hadoopConfiguration.set("fs.s3a.endpoint", ""s3.eu-central-1.amazonaws.com")
在我的 RDD 上调用 saveAsTextFile
时,它开始正常,将所有内容保存在 S3 上。然而,当它从_temporary
传输到最终输出结果一段时间后,它会产生错误:
Exception in thread "main" com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 403, AWS Service: Amazon S3, AWS Request ID: XXXXXXXXXXXXXXXX, AWS Error Code: SignatureDoesNotMatch, AWS Error Message: The request signature we calculated does not match the signature you provided. Check your key and signing method., S3 Extended Request ID: XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX=
at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
at com.amazonaws.services.s3.AmazonS3Client.copyObject(AmazonS3Client.java:1507)
at com.amazonaws.services.s3.transfer.internal.CopyCallable.copyInOneChunk(CopyCallable.java:143)
at com.amazonaws.services.s3.transfer.internal.CopyCallable.call(CopyCallable.java:131)
at com.amazonaws.services.s3.transfer.internal.CopyMonitor.copy(CopyMonitor.java:189)
at com.amazonaws.services.s3.transfer.internal.CopyMonitor.call(CopyMonitor.java:134)
at com.amazonaws.services.s3.transfer.internal.CopyMonitor.call(CopyMonitor.java:46)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
如果我使用 spark 包中的hadoop-client
,它甚至不会开始传输。错误随机发生,有时有效,有时无效。
【问题讨论】:
您的 ssh 密钥似乎有问题。您能检查一下您使用的密钥是否正确吗? 数据开始在s3上保存,一段时间后出现错误。 @flaviotruzzi 你解决了这个问题吗? @pangpang 使用自定义函数来保存数据。 @flaviotruzzi - 你说这是一个 dup 并链接回这个页面......? 【参考方案1】:如果您使用的是 pyspark,以下内容对我有用
aws_profile = "your_profile"
aws_region = "eu-central-1"
s3_bucket = "your_bucket"
# see https://github.com/jupyter/docker-stacks/issues/127#issuecomment-214594895
os.environ['PYSPARK_SUBMIT_ARGS'] = "--packages=org.apache.hadoop:hadoop-aws:2.7.3 pyspark-shell"
# If this doesn't work you might have to delete your ~/.ivy2 directory to reset your package cache.
# (see https://github.com/databricks/spark-redshift/issues/244#issuecomment-239950148)
import pyspark
sc=pyspark.SparkContext()
# see https://github.com/databricks/spark-redshift/issues/298#issuecomment-271834485
sc.setSystemProperty("com.amazonaws.services.s3.enableV4", "true")
# see https://***.com/questions/28844631/how-to-set-hadoop-configuration-values-from-pyspark
hadoop_conf=sc._jsc.hadoopConfiguration()
# see https://***.com/questions/43454117/how-do-you-use-s3a-with-spark-2-1-0-on-aws-us-east-2
hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoop_conf.set("com.amazonaws.services.s3.enableV4", "true")
hadoop_conf.set("fs.s3a.access.key", access_id)
hadoop_conf.set("fs.s3a.secret.key", access_key)
# see https://docs.aws.amazon.com/general/latest/gr/rande.html#s3_region
hadoop_conf.set("fs.s3a.endpoint", "s3." + aws_region + ".amazonaws.com")
sql=pyspark.sql.SparkSession(sc)
path = s3_bucket + "your_file_on_s3"
dataS3=sql.read.parquet("s3a://" + path)
【讨论】:
从 zeppelin 运行此代码时,您还需要设置%spark.conf spark.executor.extraJavaOptions -Dcom.amazonaws.services.s3.enableV4
【参考方案2】:
请尝试设置以下值:
System.setProperty("com.amazonaws.services.s3.enableV4", "true")
hadoopConf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoopConf.set("com.amazonaws.services.s3.enableV4", "true")
hadoopConf.set("fs.s3a.endpoint", "s3." + region + ".amazonaws.com")
请设置该存储桶所在的区域,在我的例子中是:eu-central-1
并将依赖项添加到 gradle 或以其他方式:
dependencies
compile 'org.apache.hadoop:hadoop-aws:2.7.2'
希望它会有所帮助。
【讨论】:
【参考方案3】:受其他答案的启发,直接在 pyspark shell 中运行以下命令为我生成了所需的输出:
sc.setSystemProperty("com.amazonaws.services.s3.enableV4", "true") # fails without this
hc=sc._jsc.hadoopConfiguration()
hc.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hc.set("com.amazonaws.services.s3.enableV4", "true")
hc.set("fs.s3a.endpoint", end_point)
hc.set("fs.s3a.access.key",access_key)
hc.set("fs.s3a.secret.key",secret_key)
data = sc.textFile("s3a://bucket/file")
data.take(3)
选择您的端点:list of endpoints 我能够从亚太地区(孟买)(ap-south-1) 获取数据,这是一个仅版本 4 的区域。
【讨论】:
以上是关于将 S3(法兰克福)与 Spark 结合使用的主要内容,如果未能解决你的问题,请参考以下文章
在 V4/孟买/法兰克福地区使用 Spark-Redshift 连接器时出错
将 DATASet Api 与 Spark Scala 结合使用