在 pyspark 中使用 S3a 保存文件时获取 S3 响应代码(仅限 HTTP 代码,如 200,300,400,403,500 等)

Posted

技术标签:

【中文标题】在 pyspark 中使用 S3a 保存文件时获取 S3 响应代码(仅限 HTTP 代码,如 200,300,400,403,500 等)【英文标题】:Getting S3 Response code (Only the HTTP code like 200,300,400,403,500 Etc) while saving file using S3a in pyspark 【发布时间】:2019-02-28 21:47:16 【问题描述】:

我正在尝试获取 HTTP 代码并将其存储在 RDS 表中,以便以后分析 pyspark 作业,该作业将使用 S3a 将文件以 AVRO 格式保存到 S3。保存文件后,我知道 S3 将返回状态代码,但我不确定如何在代码中记录它。请找到代码的sn-p。

def s3_load(df, row):
    df.write.\
       format("com.databricks.spark.avro").\
       save("s3a://Test-" + row["PARTNER"].lower() + "/" + row["TABLE_NAME"] + "/" +
            datetime.datetime.today().strftime('%Y%m%d'))

在上面的代码中,我希望 o 以状态码的形式返回。 注意:我可以将 S3 中的文件保存为 AVRO 格式。

谢谢

【问题讨论】:

【参考方案1】:

这是这个问题中讨论的类似概念,获取包装 s3 API 的库或函数的状态代码:Amazon S3 POST, event when done?


最终,如果 databricks 是处理上传的库,则来自 df.write.save(...) 函数调用的结果响应代码将在 databricks 函数调用的结果中的某处找到。

Databricks 支持 s3s3a 作为保存文件的目标位置 (as shown in their docs here),但似乎 databricks 并未在此处显示来自底层操作的响应代码(也许他们这样做,我找不到它在任何文档中)。

一些前进的选择:

1234563 /p>

在 AWS 上,s3 存储桶上传是一个事件源,可用作其他操作的触发器,例如调用 AWS Lambda,您可以使用它来调用任意云托管函数。 what this architecture would look like in this tutorial.上提供大量信息

根据并行上传的需要,你可以使用AWS官方python库boto3重写你的小上传函数。讨论如何处理那些错误/响应码discussed here.

Databricks 似乎在其enterprise offering. 中的某处也具有审计日志记录功能

【讨论】:

谢谢,我使用了 Try/Except,现在我至少可以确定我的工作是否成功。

以上是关于在 pyspark 中使用 S3a 保存文件时获取 S3 响应代码(仅限 HTTP 代码,如 200,300,400,403,500 等)的主要内容,如果未能解决你的问题,请参考以下文章

通过 EMR 写入 s3a 时出现 OutOfMemory 错误

PySpark:在 Spark 数据框中读取多个 XML 文件(s3 路径列表)

Pyspark:UDF 将正则表达式应用于数据帧中的每一行

从 pyspark 访问 S3 存储桶中的文件

尝试使用 pyspark 从 S3 获取数据时出现空指针异常

使用 PySpark 从 Amazon S3 读取文本文件