Spark s3数据框选择失败:ConnectionClosedException内容长度过早结束

Posted

技术标签:

【中文标题】Spark s3数据框选择失败:ConnectionClosedException内容长度过早结束【英文标题】:Spark s3 data frame Select fails: ConnectionClosedException Premature end of Content-Length 【发布时间】:2019-09-13 08:34:31 【问题描述】:

我对 Spark 和整个生态系统非常陌生,因此对于发起的错误来说我是愚蠢的,但我没有发现任何支持或类似的问题被发布。

我在 S3 存储桶上有大量数据 (TB),这些数据被拆分为子目录中的数千个 100Mb 镶木地板文件。目前,我只想查询一个文件并选择一些行。我在学习时使用 PySpark 在本地运行 spark (3.0):

代码如下:

spark = SparkSession.builder \
    .master("local") \
    .appName("Test") \
    .getOrCreate()

path = "s3a://BUCKET_NAME/DIR/FILE.gz.parquet"
df = spark.read.parquet(path)

df.printSchema()   # this works
df.show(n=10)      # this works 
df.orderBy("sessionID").show(n=5)   # this works
df.select("sessionID").show(n=5)   # this fails

OrderBy 工作正常并快速显示按名称排序的前 5 个。但是,选择查询失败:

19/09/13 01:16:28 ERROR TaskContextImpl: Error in TaskCompletionListener
org.apache.http.ConnectionClosedException: Premature end of Content-Length delimited message body (expected: 74915373; received: 45265606
at org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:178)

所以我认为选择操作没有从 S3 存储桶接收完整数据,但我该怎么办?为什么 OrderBy 有效?

以下问题有点开放式。数据被组织成需要一次性处理的会话。但是每个会话的行分散在每个 parquet 文件和数百个 parquet 文件中,这意味着必须遍历数百 GB 才能拼凑一个完整的会话。所以我希望 Spark 按会话 ID 排序。该处理将由一个单独的 C++ 库完成,因此我必须将会话数据通过管道输出。在本地机器上处理整个数据集将是棘手的,但我可以应用一些选择将数据减少到 50Gb 之类的东西,我希望可以在几个小时内在功能强大的工作站(32 核、64 Gb)上处理这些数据。这可行吗?设计看起来如何?抱歉,这含糊不清,但 Spark 示例要么在一个很小的 ​​JSON 上非常简单,要么假设有非常深入的知识,并且很难从前者过渡到后者。

【问题讨论】:

问题与 spark 3.0 有关 【参考方案1】:

在花费数小时浏览不同的配置选项并且基本上没有从我开始的地方开始之后。结果系统管理员安装了最新的 Spark 3.0,但肯定有问题。

我安装了 spark 2.4.4,确保选择了 java 8 Pyspark error - Unsupported class file major version 55

一切都按预期进行

【讨论】:

我正在运行 Spark 2.4.7,构建了 hadoop 2.7.3,hadoop-aws 2.7.3 和 aws-java-sdk 1.7.4,但仍面临此错误。知道可能出了什么问题吗?

以上是关于Spark s3数据框选择失败:ConnectionClosedException内容长度过早结束的主要内容,如果未能解决你的问题,请参考以下文章

使用 403 写入 S3 时,在 EMR 上运行的 Spark 偶尔会失败

使用 pyspark 将镶木地板文件(在 aws s3 中)存储到 spark 数据框中

Spark中转换的失败处理

Spark S3A 写入省略了上传部分而没有失败

如何加快 Spark 中的大数据框连接

Spark 失败,因为 S3 文件已更新。如何消除这个错误?