Spark S3A 写入省略了上传部分而没有失败

Posted

技术标签:

【中文标题】Spark S3A 写入省略了上传部分而没有失败【英文标题】:Spark S3A write omits upload part without failure 【发布时间】:2019-02-28 23:25:31 【问题描述】:

我正在使用 Spark 2.4.0 和 Hadoop 2.7、hadoop-aws 2.7.5 将数据集写入 S3A 上的镶木地板文件。有时会丢失文件部分;即这里的部分00003

> aws s3 ls my-bucket/folder/
2019-02-28 13:07:21          0 _SUCCESS
2019-02-28 13:06:58   79428651 part-00000-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
2019-02-28 13:06:59   79586172 part-00001-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
2019-02-28 13:07:00   79561910 part-00002-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
2019-02-28 13:07:01   79192617 part-00004-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
2019-02-28 13:07:07   79364413 part-00005-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
2019-02-28 13:07:08   79623254 part-00006-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
2019-02-28 13:07:10   79445030 part-00007-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
2019-02-28 13:07:10   79474923 part-00008-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
2019-02-28 13:07:11   79477310 part-00009-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
2019-02-28 13:07:12   79331453 part-00010-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
2019-02-28 13:07:13   79567600 part-00011-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
2019-02-28 13:07:13   79388012 part-00012-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
2019-02-28 13:07:14   79308387 part-00013-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
2019-02-28 13:07:15   79455483 part-00014-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
2019-02-28 13:07:17   79512342 part-00015-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
2019-02-28 13:07:18   79403307 part-00016-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
2019-02-28 13:07:18   79617769 part-00017-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
2019-02-28 13:07:19   79333534 part-00018-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
2019-02-28 13:07:20   79543324 part-00019-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet

我最关心的是 Spark 应用程序SUCCEEDS

stderr 对于驱动程序和执行程序看起来都非常正常 stdout 对于驱动程序来说看起来很正常 只有执行程序的标准输出给出了出现问题的任何迹象:
2019-02-28 21:05:39 INFO  AmazonHttpClient:448 - Unable to execute HTTP request: Read timed out
java.net.SocketTimeoutException: Read timed out
    at java.net.SocketInputStream.socketRead0(Native Method)
    at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
    at java.net.SocketInputStream.read(SocketInputStream.java:171)
    at java.net.SocketInputStream.read(SocketInputStream.java:141)
    at org.apache.http.impl.io.AbstractSessionInputBuffer.fillBuffer(AbstractSessionInputBuffer.java:161)
    at org.apache.http.impl.io.SocketInputBuffer.fillBuffer(SocketInputBuffer.java:82)
    at org.apache.http.impl.io.AbstractSessionInputBuffer.readLine(AbstractSessionInputBuffer.java:278)
    at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:138)
    at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:56)
    at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:259)
    at org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(AbstractHttpClientConnection.java:286)
    at org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(DefaultClientConnection.java:257)
    at org.apache.http.impl.conn.ManagedClientConnectionImpl.receiveResponseHeader(ManagedClientConnectionImpl.java:207)
    at org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:273)
    at com.amazonaws.http.protocol.SdkHttpRequestExecutor.doReceiveResponse(SdkHttpRequestExecutor.java:66)
    at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:125)
    at org.apache.http.impl.client.DefaultRequestDirector.tryExecute(DefaultRequestDirector.java:684)
    at org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:486)
    at org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:835)
    at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
    at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
    at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:384)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3480)
    at com.amazonaws.services.s3.AmazonS3Client.listObjects(AmazonS3Client.java:604)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:960)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.deleteUnnecessaryFakeDirectories(S3AFileSystem.java:1144)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.finishedWrite(S3AFileSystem.java:1133)
    at org.apache.hadoop.fs.s3a.S3AOutputStream.close(S3AOutputStream.java:142)
    at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
    at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
    at org.apache.parquet.hadoop.util.HadoopPositionOutputStream.close(HadoopPositionOutputStream.java:64)
    at org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:685)
    at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:122)
    at org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:165)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetOutputWriter.scala:42)
    at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseResources(FileFormatDataWriter.scala:57)
    at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:74)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:244)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:239)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:245)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:168)
...

(此堆栈跟踪重复了 6 次)

我正在调整 Hadoop S3A 配置,以查看是否可以减少这种情况发生的频率,但我真正想要的是让应用程序在发生这种情况时FAIL。实际上,下游应用程序启动时期望数据存在,并由于缺少数据而产生不正确的结果。

在这种情况下如何更改 Spark/Hadoop 的行为?

【问题讨论】:

【参考方案1】:

似乎不可能解决这个问题(至少在 Hadoop 2.7 中),所以现在我在每次 Spark S3 写入后添加了一个断言,以确保文件部分的数量与数据集中的分区数量相匹配RDD:

  def overwriteParquetS3(
    ds: Dataset[_],
    bucket: String,
    folder: String
  ): Unit = 
    val numPartitions = ds.rdd.getNumPartitions
    val destination = GeneralUtils.joinPaths("s3a://", bucket, folder)

    ds
        .write
        .mode(SaveMode.Overwrite)
        .parquet(destination)

    val fs = FileSystem.get(
      URI.create(s"s3a://$bucket/"),
      ds.sparkSession.sparkContext.hadoopConfiguration
    )
    val writtenFiles = fs.listFiles(new Path(destination), false)
    val parts = new ArrayBuffer[LocatedFileStatus]()
    while (writtenFiles.hasNext) 
      val next = writtenFiles.next()
      val name = next.getPath.getName
      if (name.startsWith("part-") && name.endsWith(".parquet")) 
        parts += next
      
    

    val filePartStr = parts
        .sortBy(_.getPath.getName)
        .map((fileStatus) => s"$fileStatus.getModificationTime $fileStatus.getBlockSize $fileStatus.getPath.getName")
        .mkString("\n\t")
    assert(
      parts.length == numPartitions,
      s"Expected to write dataframe with $numPartitions partitions in $destination but instead " +
          s"found $parts.length written parts!\n\t$filePartStr"
    )

    println(s"Confirmed that there numPartitions $numPartitions = $parts.length written parts")
  

这似乎涵盖了写入应该出错但没有出错的所有情况。

【讨论】:

这里的 FS 绑定仍然有问题,尽管这不是简单的 AWS S3 一致性。我会考虑使用 spark JIRA 提交错误(在此处指向它)&我们可以查看它。 FWIW,Cloudera 确实通过 Ceph 对连接器的 Hadoop 3.0 版本进行了认证,我不记得有任何与该资格相关的特定错误被提交(我会听说过)。 感谢您的建议,在这里创建 JIRA:issues.apache.org/jira/browse/SPARK-27098 @SteveLoughran 反映了我在 JIRA 中的评论,您如何看待根本没有关于缺失部分、重命名或其他方式的调试日志?似乎 Spark 只是忘记了那个分区? 尝试看看是否可以使用不同的 FS 复制问题,或者在 hadoop v1 和 v2 提交算法之间切换。火花驱动程序将在报告时告诉任务提交;如果没有成功,spark 将尝试新任务。作业提交 (v1) 将所有已提交的任务输出提升到 dest 目录。 V2:只创建 _SUCCESS 文件,因为任务直接提交到 dest 目录。两者都使用重命名,并依赖于一致的列表 +关于提交协议的论文。 github.com/steveloughran/zero-rename-committer/releases 它的工作原理真是太棒了【参考方案2】:

这被称为“文件系统不一致与作业提交者的副作用,该作业提交者依赖于一致的目录列表将工作重命名到位”

修复

使用一致性层;对于 S3A,即 S3Guard 使用备用提交者:对于 ASF Spark 和 Hadoop 3.1,这就是“零重命名提交者” 激进但从长远来看最好:使用不同的数据布局,我想到的是 Apache Iceberg

更新:在此特定实例中并非如此,因为 Ceph 是 FS 并且是一致的。

【讨论】:

这些看起来是有用的工具,但是 * 前两个在 Hadoop 2.7 中不可用。 * 我实际上并没有使用 AWS,也不会开始使用。 Hadoop 清楚地意识到了这种情况下的失败。符合 S3 的数据湖的最终一致性没有错。考虑到这一点,我怎样才能让它抛出错误? 您说您是“S3A 上的镶木地板文件”,所以您使用的是 AWS S3 或第三方如果您使用的是第三方 S3 存储,那么它应该是一致的,但您很遗憾你自己。这将是配置问题,例如某些进程仍在尝试 AWS us-east 进行 S3 访问。 顺便说一句,关于堆栈跟踪:这只是一些清理期间的超时,在最近的 hadoop 版本中被捕获并忽略了一些东西。不太可能是根本原因 不,我既没有使用 AWS,也没有使用第三方。这是我自己硬件上的 Ceph 实例。绝对不是配置问题,但您可能对堆栈跟踪与写入失败无关。

以上是关于Spark S3A 写入省略了上传部分而没有失败的主要内容,如果未能解决你的问题,请参考以下文章

Spark 写入 S3 V4 SignatureDoesNotMatch 错误

通过 EMR 写入 s3a 时出现 OutOfMemory 错误

如何从 Apache Spark 访问 s3a:// 文件?

PySpark:AWS s3n 正在工作,但 s3a 没有

Apache Spark s3a 提交者 - 线程堆栈 - 内存不足问题

使用 Spark 访问 s3a 时出现 403 错误