从 S3 通配符加载文件时引发错误
Posted
技术标签:
【中文标题】从 S3 通配符加载文件时引发错误【英文标题】:spark error loading files from S3 wildcard 【发布时间】:2016-01-07 18:52:37 【问题描述】:我正在使用 pyspark shell 并尝试使用 spark 的文件通配符功能从 S3 读取数据,但出现以下错误:
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 1.2.0
/_/
Using Python version 2.7.6 (default, Jul 24 2015 16:07:07)
SparkContext available as sc.
>>> sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", 'AWS_ACCESS_KEY_ID')
>>> sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", 'AWS_SECRET_ACCESS_KEY')
>>> sc.textFile("s3n://myBucket/path/files-*", use_unicode=False).count()
16/01/07 18:03:02 INFO MemoryStore: ensureFreeSpace(37645) called with curMem=83944, maxMem=278019440
16/01/07 18:03:02 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 36.8 KB, free 265.0 MB)
16/01/07 18:03:02 INFO MemoryStore: ensureFreeSpace(5524) called with curMem=121589, maxMem=278019440
16/01/07 18:03:02 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 5.4 KB, free 265.0 MB)
16/01/07 18:03:02 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on salve1:48235 (size: 5.4 KB, free: 265.1 MB)
16/01/07 18:03:02 INFO BlockManagerMaster: Updated info of block broadcast_2_piece0
16/01/07 18:03:02 INFO SparkContext: Created broadcast 2 from textFile at NativeMethodAccessorImpl.java:-2
16/01/07 18:03:03 WARN RestS3Service: Response '/path' - Unexpected response code 404, expected 200
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/spark-1.2.0-bin-1.0.4/python/pyspark/rdd.py", line 819, in count
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
File "/spark-1.2.0-bin-1.0.4/python/pyspark/rdd.py", line 810, in sum
return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
File "/spark-1.2.0-bin-1.0.4/python/pyspark/rdd.py", line 715, in reduce
vals = self.mapPartitions(func).collect()
File "/spark-1.2.0-bin-1.0.4/python/pyspark/rdd.py", line 676, in collect
bytesInJava = self._jrdd.collect().iterator()
File "/spark-1.2.0-bin-1.0.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
File "/spark-1.2.0-bin-1.0.4/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o65.collect.
: org.apache.hadoop.fs.s3.S3Exception: org.jets3t.service.S3ServiceException: Failed to sanitize XML document destined for handler class org.jets3t.service.impl.rest.XmlResponsesSaxParser$ListBucketHandler
at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.list(Jets3tNativeFileSystemStore.java:197)
at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.list(Jets3tNativeFileSystemStore.java:166)
at sun.reflect.GeneratedMethodAccessor40.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59)
at org.apache.hadoop.fs.s3native.$Proxy7.list(Unknown Source)
at org.apache.hadoop.fs.s3native.NativeS3FileSystem.listStatus(NativeS3FileSystem.java:375)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:842)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:902)
at org.apache.hadoop.fs.FileSystem.globStatusInternal(FileSystem.java:1032)
at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:987)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:177)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:201)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
at org.apache.spark.api.python.PythonRDD.getPartitions(PythonRDD.scala:57)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1352)
at org.apache.spark.rdd.RDD.collect(RDD.scala:780)
at org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala:309)
at org.apache.spark.api.java.JavaRDD.collect(JavaRDD.scala:32)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.jets3t.service.S3ServiceException: Failed to sanitize XML document destined for handler class org.jets3t.service.impl.rest.XmlResponsesSaxParser$ListBucketHandler
at org.jets3t.service.impl.rest.XmlResponsesSaxParser.sanitizeXmlDocument(XmlResponsesSaxParser.java:179)
at org.jets3t.service.impl.rest.XmlResponsesSaxParser.parseListBucketObjectsResponse(XmlResponsesSaxParser.java:198)
at org.jets3t.service.impl.rest.httpclient.RestS3Service.listObjectsInternal(RestS3Service.java:1090)
at org.jets3t.service.impl.rest.httpclient.RestS3Service.listObjectsChunkedImpl(RestS3Service.java:1056)
at org.jets3t.service.S3Service.listObjectsChunked(S3Service.java:1328)
at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.list(Jets3tNativeFileSystemStore.java:181)
... 44 more
Caused by: java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3332)
at java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137)
at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121)
at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:569)
at java.lang.StringBuffer.append(StringBuffer.java:369)
at org.jets3t.service.impl.rest.XmlResponsesSaxParser.sanitizeXmlDocument(XmlResponsesSaxParser.java:160)
at org.jets3t.service.impl.rest.XmlResponsesSaxParser.parseListBucketObjectsResponse(XmlResponsesSaxParser.java:198)
at org.jets3t.service.impl.rest.httpclient.RestS3Service.listObjectsInternal(RestS3Service.java:1090)
at org.jets3t.service.impl.rest.httpclient.RestS3Service.listObjectsChunkedImpl(RestS3Service.java:1056)
at org.jets3t.service.S3Service.listObjectsChunked(S3Service.java:1328)
at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.list(Jets3tNativeFileSystemStore.java:181)
at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.list(Jets3tNativeFileSystemStore.java:166)
at sun.reflect.GeneratedMethodAccessor40.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59)
at org.apache.hadoop.fs.s3native.$Proxy7.list(Unknown Source)
at org.apache.hadoop.fs.s3native.NativeS3FileSystem.listStatus(NativeS3FileSystem.java:375)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:842)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:902)
at org.apache.hadoop.fs.FileSystem.globStatusInternal(FileSystem.java:1032)
at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:987)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:177)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:201)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
当我尝试加载单个文件(不使用通配符)时,代码有效。由于我需要读取大约 100k 个文件,我想知道将所有文件加载到 RDD 中的最佳方法是什么。
更新
在我看来,问题在于我使用的密钥前缀在包含我所有文件的 s3“目录”中有超过 30 万个文件。我的文件以日期为后缀。
s3://myBucket/path/files-2016-01-01-02-00
s3://myBucket/path/files-2016-01-01-02-01
s3://myBucket/path/files-2016-01-01-03-00
s3://myBucket/path/files-2016-01-01-03-01
我试图使用通配符仅选择带有 s3n://myBucket/path/files-2016-01-01-03-*
的日期的一些文件当我打开调试日志记录时,我看到 spark 列出了 s3“目录”(s3://myBucket/path/
) 中的所有文件,而不是仅具有我指定的键前缀 (s3://myBucket/path/files-2016-01-01-03-
) 的文件。因此,即使我只尝试读取 2 个文件,所有 300k 文件都被列出,这可能是导致内存不足的原因。
【问题讨论】:
【参考方案1】:我直接从 S3 列出了我的文件,然后制作了一个包含确切文件名的 RDD,到目前为止它对我来说是有效的。
raw_file_list = subprocess.Popen("env AWS_ACCESS_KEY_ID="myId" AWS_SECRET_ACCESS_KEY="myKey" aws s3 ls s3://myBucket/path/files-2016-01-01-02", shell=True, stdout=subprocess.PIPE).stdout.read().strip().split('\n')
s3_file_list = sc.parallelize(raw_file_list).map(lambda line: "s3n://myBucket/path/%s" % line.split()[3]).collect()
rdd = sc.textFile(','.join(s3_file_list), use_unicode=False)
【讨论】:
【参考方案2】:这是内存不足的问题。因此,首先尝试将模式限制为更少的文件,看看它是否能解决问题。
【讨论】:
我尝试了一个只匹配两个文件的模式,但它仍然给出了同样的错误。此外,我尝试加载的两个文件的总大小为 150kb,这不会导致内存不足。【参考方案3】:Spark 在加载大量小文件时遇到了一个愚蠢的问题,因为它会为每个文件广播一些数据。这可能在几天前发布的 1.6.0 中得到修复。目前我希望您的代码正在加载每个文件并在后台将 RDD 联合在一起。
我使用的解决方案是将所有文件移动到加载到 S3 上的单个目录中,然后将其作为 glob 传递,例如:s3n://myBucket/path/input-files/*
。这样,就 spark 而言,您只加载单个路径,并且不会为该路径中的每个文件创建广播变量。
【讨论】:
以上是关于从 S3 通配符加载文件时引发错误的主要内容,如果未能解决你的问题,请参考以下文章
将 cloudfront 通配符 CNAME 子域路由到 s3 存储桶中的文件夹
读取镶木地板文件时,有没有办法在 basePath 中使用通配符?