Kafka 连接器的动态创建
Posted
技术标签:
【中文标题】Kafka 连接器的动态创建【英文标题】:Dynamic creation of Kafka Connectors 【发布时间】:2021-03-16 11:30:33 【问题描述】:我使用 Strimzi 和 AKS 在 kubernetes 中部署了一个 Kafka 集群和一个 Kafka Connect 集群。我想开始从 RSS 资源中读取数据来为我的 Kafka 集群提供数据,所以我创建了一个“org.kaliy.kafka.connect.rss.RssSourceConnector”的连接器实例,它从特定的 RSS 源读取,给定一个 url,然后写入一个特定的话题。但我的全部意图是最终拥有一个 Kafka Connect 集群,能够管理大量新 RSS 的外部请求以进行读取;这就是我所有疑问的来源:
我应该为每个 RSS 提要创建一个 Kaliy RSS 连接器实例吗?还是实现我自己的连接器会更好,所以我只创建它的一个实例,每次我想阅读新的 RSS 提要时,我都会在连接器中创建一个新任务? 谁应该负责确保 Kafka Connect 集群状态是理想状态?我的意思是,如果一个连接器(在 1 个 RSS 提要的情况下:1 个连接器实例)停止工作,谁应该尝试重新启动它?通过 Kafka Connect REST API 的外部客户端? Kubernetes 本身?现在,我认为我最好的选择是依靠 Kafka Connect REST API 让外部客户端负责管理连接器集的状态,但我不知道这些是否旨在接收大量请求情况就是这样。也许这些可以通过在 Kafka Connect REST API 配置中配置多个侦听器来扩展,但我不知道。 非常感谢!
【问题讨论】:
我们使用外部脚本来监控连接器,但是可以编写一个 k8s 操作符以同样的方式工作。关于第一点,如果每个实例只能读取一个 RSS 提要,则多个连接器应该没问题 【参考方案1】:使用 Kafka Connect 的主要好处之一是利用了配置驱动的方法,因此您将因实施自己的连接器而失去这一点。在我看来,最好的策略是为每个 RSS 提要拥有一个连接器实例。在拥有单一数据源系统时减少实例数量可能是有意义的,以避免过载。
使用Strimzi Operator,Kafka Connect 集群将受到监控,并在需要时尝试恢复所需的集群状态。这不包括单个连接器实例及其任务,但您可以利用 K8s API 来监控连接器自定义资源 (CR) 状态,而不是 REST API。
例子:
$ kubetctl get kafkaconnector amq-sink -o yaml
apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
# ...
status:
conditions:
- lastTransitionTime: "2020-12-07T10:30:28.349Z"
status: "True"
type: Ready
connectorStatus:
connector:
state: RUNNING
worker_id: 10.116.0.66:8083
name: amq-sink
tasks:
- id: 0
state: RUNNING
worker_id: 10.116.0.66:8083
type: sink
observedGeneration: 1
【讨论】:
以上是关于Kafka 连接器的动态创建的主要内容,如果未能解决你的问题,请参考以下文章