如何阻止 Spark 结构化流每次都列出 S3 存储桶中的所有文件
Posted
技术标签:
【中文标题】如何阻止 Spark 结构化流每次都列出 S3 存储桶中的所有文件【英文标题】:How to stop spark structured streaming from listing all files in an S3 bucket every time 【发布时间】:2018-06-15 14:37:58 【问题描述】:我在 pyspark 上有一个结构化的流式传输作业,它对文件源进行一些聚合。我有一个 kinesis firehose 将来自 IoT 类型应用程序的数据组合起来,并将数据存储在 S3 位置上,每分钟将一个文件存储在以下文件夹结构中的不同文件夹中 -
s3://year/month/day/hour/
我的 spark 结构化流式传输作业似乎无法列出我的 S3 存储桶中的所有可用文件。由于上市过程似乎比我设置的 processingTime 花费更多的时间。我收到以下警告,我想知道是否有办法不让这种情况发生。
18/06/15 14:28:35 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 60000 milliseconds, but spent 74364 milliseconds
18/06/15 14:28:42 WARN FileStreamSource: Listed 4449 file(s) in 6822.134244 ms
18/06/15 14:29:06 WARN FileStreamSource: Listed 4449 file(s) in 6478.381219 ms
18/06/15 14:30:08 WARN FileStreamSource: Listed 4450 file(s) in 8285.654031 ms
【问题讨论】:
【参考方案1】:S3 API List 操作只能用于检索共享前缀的存储桶中的所有对象键。所以根本不可能只列出新的、未处理的对象。 Databricks 的人似乎有a solution,您可以在其中设置 S3 以在创建新对象时创建 SQS 记录。 Spark 然后检查 SQS 是否有新对象并从 S3 检索特定对象(即不涉及列表)。不幸的是,此连接器似乎仅在 Databricks 集群上可用并且尚未开源,因此如果您使用例如 EMR,则无法使用它(当然,除非您自己实现连接器)。
【讨论】:
【参考方案2】:类FileStreamSource
的评论:
// 列出文件使用超过2秒时输出警告。
因此,要消除此警告,您可以减少每次触发器处理的文件数量:
maxFilesPerTrigger
选项可以在文件源上设置,以确保它需要
第一个警告是您设置的触发间隔 (60000
) 比所用时间短 (74364
)。只需增加触发间隔即可摆脱这种情况。
【讨论】:
我明白,事实上我在我的代码中使用了 maxFilesPerTrigger。但是,它似乎想要列出文件。我也想知道为什么 Spark 每次都需要再次列出文件。 文件流源总是列出文件。如果花费的时间超过阈值,它会在 log.warn 消息中显示它,但如果不是,它仍然会在跟踪日志级别记录文件列表以上是关于如何阻止 Spark 结构化流每次都列出 S3 存储桶中的所有文件的主要内容,如果未能解决你的问题,请参考以下文章
如何将 Spark 结构化流与 Kafka Direct Stream 结合使用?
如何将 Kinesis 流存储到 S3 存储桶中特定文件夹结构中的 S3 存储
ini 列出nginx以阻止多个服务器内的垃圾邮件机器人。每次更改时都需要自动提取此列表。