通过 EMR 写入 s3a 时出现 OutOfMemory 错误

Posted

技术标签:

【中文标题】通过 EMR 写入 s3a 时出现 OutOfMemory 错误【英文标题】:OutOfMemory error when writing to s3a through EMR 【发布时间】:2020-08-29 18:29:33 【问题描述】:

获取以下 PySpark 代码的 OutOfMemory 错误:(在写入一定数量的行后失败。如果我尝试写入 hadoop 文件系统而不是使用 s3a,则不会发生这种情况,所以我认为我已经缩小了范围归结为 s3a 的问题。) - 写入 s3a 的最终目标。 想知道是否有一个最佳的 s3a 配置,我不会用完超大表的内存。

df = spark.sql("SELECT * FROM my_big_table")
df.repartition(1).write.option("header", "true").csv("s3a://mycsvlocation/folder/")

我的 s3a 配置(emr 默认):

('fs.s3a.attempts.maximum', '10')
('fs.s3a.buffer.dir', '$hadoop.tmp.dir/s3a')
('fs.s3a.connection.establish.timeout', '5000')
('fs.s3a.connection.maximum', '15')
('fs.s3a.connection.ssl.enabled', 'true')
('fs.s3a.connection.timeout', '50000')
('fs.s3a.fast.buffer.size', '1048576')
('fs.s3a.fast.upload', 'true')
('fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')
('fs.s3a.max.total.tasks', '1000')
('fs.s3a.multipart.purge', 'false')
('fs.s3a.multipart.purge.age', '86400')
('fs.s3a.multipart.size', '104857600')
('fs.s3a.multipart.threshold', '2147483647')
('fs.s3a.paging.maximum', '5000')
('fs.s3a.threads.core', '15')
('fs.s3a.threads.keepalivetime', '60')
('fs.s3a.threads.max', '256')
('mapreduce.fileoutputcommitter.algorithm.version', '2')
('spark.authenticate', 'true')
('spark.network.crypto.enabled', 'true')
('spark.network.crypto.saslFallback', 'true')
('spark.speculation', 'false')

堆栈跟踪的基础:

Caused by: java.lang.OutOfMemoryError
        at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
        at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
        at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
        at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
        at org.apache.hadoop.fs.s3a.S3AFastOutputStream.write(S3AFastOutputStream.java:194)
        at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:60)
        at java.io.DataOutputStream.write(DataOutputStream.java:107)
        at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221)
        at sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:282)
        at sun.nio.cs.StreamEncoder.write(StreamEncoder.java:125)
        at java.io.OutputStreamWriter.write(OutputStreamWriter.java:207)
        at com.univocity.parsers.common.input.WriterCharAppender.writeCharsAndReset(WriterCharAppender.java:152)
        at com.univocity.parsers.common.AbstractWriter.writeRow(AbstractWriter.java:808)
        ... 16 more

【问题讨论】:

我猜这是因为repartition(1),因为所有负载都归于一名工人。您可以尝试不使用repartition 并确认它是否正常工作吗?此外,如果您能够在 hdfs 中写入,则可以使用 s3distcphdfs 复制到 s3 spark 对于多个文件而不是单个文件要好得多。 EMR 不支持S3A。使用S3。 Aws doc是的,你可以使用它,但可能会出现很多问题。 【参考方案1】:

这里的问题是默认s3a上传不支持上传大于2GB或者2147483647字节的单个大文件。

('fs.s3a.multipart.threshold', '2147483647')

我的 EMR 版本比最近的版本旧,因此 multipart.threshold 参数只是一个整数,因此对于单个“部分”或文件,限制为 2147483647 字节。较新的版本使用 long 而不是 int,并且可以支持更大的单个文件大小限制。

我将使用解决方法将文件写入本地 hdfs,然后通过单独的 java 程序将其移动到 s3。

【讨论】:

以上是关于通过 EMR 写入 s3a 时出现 OutOfMemory 错误的主要内容,如果未能解决你的问题,请参考以下文章

未找到 AWS EMR s3a 文件系统

使用 Spark 通过 s3a 将 parquet 文件写入 s3 非常慢

Apache Spark Hadoop S3A SignatureDoesNotMatch

在 Spark 中的 EMR 上使用 --py-files 从 .zip 文件(使用 zipfile 包在 python 中创建)导入模块时出现问题

将 pyspark pandas_udf 与 AWS EMR 一起使用时出现“没有名为‘pandas’的模块”错误

尝试在 AWS EMR 中获得 GPU 支持时出现错误“一个 NVIDIA 内核模块 'nvidia' 似乎已加载到您的内核中”