扩展 Kafka Connect 以处理 10K S3 存储桶
Posted
技术标签:
【中文标题】扩展 Kafka Connect 以处理 10K S3 存储桶【英文标题】:Scaling Kafka Connect to handle 10K S3 buckets 【发布时间】:2021-10-03 22:17:12 【问题描述】:我想将各种 S3 存储桶(超过 10,000 个存储桶,每个文件大约 20-50MB)中的数据加载到 Apache Kafka 中。存储桶列表是动态的 - 在运行时添加和删除存储桶。理想情况下,每个存储桶配置应该有自己的轮询间隔(扫描新文件的频率 - 至少 60 秒,但可能更多)和优先级(正在处理的并发文件数)。
请注意,由于每个存储桶所有者的组织中存在各种 IT 政策,因此无法设置从每个 S3 存储桶到 SQS/SNS/Lambda 的通知。
Kafka Connect 似乎是此类任务最常用的工具,它的可插拔架构将使将来添加新源变得更加容易,因此非常适合。将每个 S3 存储桶配置为其自己的连接器将使我能够为每个任务设置不同数量的任务(映射到优先级)和轮询间隔。并且为我预期的文件格式构建 Java 自定义 Kafka Connect 源任务听起来很合理。
但是,Kafka Connect code 表示每个正在运行的任务在任务的生命周期内都分配有自己的线程。因此,如果我有 10K 个存储桶,每个存储桶都配置有自己的连接器和单个任务,那么我的 Kafka Connect 分布式工作池中将运行 10K 个线程。很多线程大多只是 sleep()-ing。
在 Kafka Connect 中扩展任务/连接器数量的正确方法是什么?
【问题讨论】:
【参考方案1】:Kafka Connect 是分布式框架,可以作为独立模式或分布式工作,作为分布式框架,您正在从多个商品服务器创建 kafka 连接集群,每个服务器托管 kafka 连接实例并可以执行连接器的任务,如果您需要更多功能您可以添加更多托管连接实例的服务器,
阅读 S3 源连接器文档我没有找到“白名单”/“正则表达式”以从多个存储桶中读取它的方法...
【讨论】:
当然,我可以添加更多分布式连接工作程序,但我仍然会运行大量基本上只是休眠的线程。这真是一种浪费。以上是关于扩展 Kafka Connect 以处理 10K S3 存储桶的主要内容,如果未能解决你的问题,请参考以下文章
Apache Kafka Connect JNDI注入漏洞复现(CVE-2023-25194)
使用 cp-kafka-connect-base 为 snowflake-kafka-connector 构建一个组合的 docker 映像,以部署在 kafka connect 集群上