如何阻止 Spark 结构化流每次都列出 S3 存储桶中的所有文件

Posted

技术标签:

【中文标题】如何阻止 Spark 结构化流每次都列出 S3 存储桶中的所有文件【英文标题】:How to stop spark structured streaming from listing all files in an S3 bucket every time 【发布时间】:2018-06-15 14:37:58 【问题描述】:

我在 pyspark 上有一个结构化的流式传输作业,它对文件源进行一些聚合。我有一个 kinesis firehose 将来自 IoT 类型应用程序的数据组合起来,并将数据存储在 S3 位置上,每分钟将一个文件存储在以下文件夹结构中的不同文件夹中 -

s3://year/month/day/hour/

我的 spark 结构化流式传输作业似乎无法列出我的 S3 存储桶中的所有可用文件。由于上市过程似乎比我设置的 processingTime 花费更多的时间。我收到以下警告,我想知道是否有办法不让这种情况发生。

18/06/15 14:28:35 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 60000 milliseconds, but spent 74364 milliseconds
18/06/15 14:28:42 WARN FileStreamSource: Listed 4449 file(s) in 6822.134244 ms
18/06/15 14:29:06 WARN FileStreamSource: Listed 4449 file(s) in 6478.381219 ms
18/06/15 14:30:08 WARN FileStreamSource: Listed 4450 file(s) in 8285.654031 ms

【问题讨论】:

【参考方案1】:

S3 API List 操作只能用于检索共享前缀的存储桶中的所有对象键。所以根本不可能只列出新的、未处理的对象。 Databricks 的人似乎有a solution,您可以在其中设置 S3 以在创建新对象时创建 SQS 记录。 Spark 然后检查 SQS 是否有新对象并从 S3 检索特定对象(即不涉及列表)。不幸的是,此连接器似乎仅在 Databricks 集群上可用并且尚未开源,因此如果您使用例如 EMR,则无法使用它(当然,除非您自己实现连接器)。

【讨论】:

【参考方案2】:

FileStreamSource的评论:

// 列出文件使用超过2秒时输出警告。

因此,要消除此警告,您可以减少每次触发器处理的文件数量:

maxFilesPerTrigger 选项可以在文件源上设置,以确保它需要

第一个警告是您设置的触发间隔 (60000) 比所用时间短 (74364)。只需增加触发间隔即可摆脱这种情况。

【讨论】:

我明白,事实上我在我的代码中使用了 maxFilesPerTrigger。但是,它似乎想要列出文件。我也想知道为什么 Spark 每次都需要再次列出文件。 文件流源总是列出文件。如果花费的时间超过阈值,它会在 log.warn 消息中显示它,但如果不是,它仍然会在跟踪日志级别记录文件列表

以上是关于如何阻止 Spark 结构化流每次都列出 S3 存储桶中的所有文件的主要内容,如果未能解决你的问题,请参考以下文章

如何将 Spark 结构化流与 Kafka Direct Stream 结合使用?

如何将 Kinesis 流存储到 S3 存储桶中特定文件夹结构中的 S3 存储

ini 列出nginx以阻止多个服务器内的垃圾邮件机器人。每次更改时都需要自动提取此列表。

如何将 Spark Streaming 检查点位置存储到 S3 中?

如何将 Spark 结构化流数据写入 REST API?

如何在 Spark 结构化流中获取书面记录的数量?