Kafka-Connect:在分布式模式下创建新连接器就是创建新组

Posted

技术标签:

【中文标题】Kafka-Connect:在分布式模式下创建新连接器就是创建新组【英文标题】:Kafka-Connect: Creating a new connector in distributed mode is creating new group 【发布时间】:2017-06-02 20:49:57 【问题描述】:

我目前正在使用 confluent 3.0.1 平台。我正在尝试在两个不同的工作人员上创建 2 个连接器,但尝试创建一个新的连接器正在为它创建一个新组。

Two connectors were created using below details:

1) POST http://devmetric.com:8083/connectors


    "name": "connector1",
    "config": 
        "connector.class": "com.xxx.kafka.connect.sink.DeliverySinkConnector",
        "tasks.max": "1",
        "topics": "dev.ps_primary_delivery",
        "elasticsearch.cluster.name": "ad_metrics_store",
        "elasticsearch.hosts": "devkafka1.com:9300",
        "elasticsearch.bulk.size": "100",
        "tenants": "tenant1"
    


2) POST http://devkafka01.com:8083/connectors


    "name": "connector2",
    "config": 
        "connector.class": "com.xxx.kafka.connect.sink.DeliverySinkConnector",
        "tasks.max": "1",
        "topics": "dev.ps_primary_delivery",
        "elasticsearch.cluster.name": "ad_metrics_store",
        "elasticsearch.hosts": "devkafka.com:9300",
        "elasticsearch.bulk.size": "100",
        "tenants": "tenant1"
    

但它们都是在不同的组 ID 下创建的。在此之后,我查询了现有的组。

$ sh ./bin/kafka-consumer-groups --bootstrap-server devmetric.com:9091  --new-consumer  --list

Result was:
connect-connector2
connect-connector1

这些组是由 Kafka connect 自动创建的,不是我提供的。我在worker.properties 中给出了不同的group.id。但我希望两个连接器都在同一个组下,以便它们并行工作以共享消息。截至目前,我在主题“dev.ps_primary_delivery”上有 100 万个数据,我希望两个连接器各获得 50 万个。

请告诉我该怎么做。

【问题讨论】:

【参考方案1】:

您可以将 consumer.group.id 设置为 Kafka Connect 可以采用的值并将其用作整个应用程序的 group.id

优势:您的应用程序连接到一个消费者组 缺点:你应该小心消费者组的配置。让它们看起来都一样

【讨论】:

甚至尝试在配置中添加consumer.group.id,但它只是将消费者分配为“connect-”格式。【参考方案2】:

我认为有必要澄清一下……

    worker.properties 文件中的group.id 未引用消费者组。它是一个“工作组”——同一个工作组中的多个工作人员将在他们之间分配工作——所以如果同一个连接器有很多任务(例如 JDBC 连接器对每个表都有一个任务),这些任务将分配给所有小组中的工人。

    接收器连接器确实有属于消费者组的消费者。该组的group.id 始终为“connect-”+连接器名称。在您的情况下,您根据连接器名称获得了“connect-connector1”和“connect-connector2”。这也意味着两个连接器位于同一组中的唯一方法是......如果它们具有相同的名称。但是名称是唯一的,因此同一组中不能有两个连接器。原因是……

    连接器本身并不真正获取事件,它们只是启动一堆任务。每个任务都有属于连接器消费者组的消费者,每个任务将独立处理主题和分区的子集。所以在同一个组中有两个连接器,基本上意味着他们所有的任务都属于同一个组 - 那么为什么需要两个连接器呢?只需为该连接器配置更多主题和更多任务即可。

唯一的例外是如果您使用的连接器未正确使用任务或将您限制为仅执行一项任务。在这种情况下——要么他们有充分的理由,要么(更有可能)有人需要改进他们的连接器......

【讨论】:

感谢您的澄清。我浏览了 Kafka-connect 代码,我可以得到消费者组和工作组之间的区别。 如果我有两个彼此不认识的 kafka-connect 实例怎么办?连接器的名称还需要唯一吗?

以上是关于Kafka-Connect:在分布式模式下创建新连接器就是创建新组的主要内容,如果未能解决你的问题,请参考以下文章

没有模式注册表的 Kafka-connect

使用独立模式 Kafka-connect 将 Postgresql 的数据捕获更改为 kafka 主题

卡夫卡连接 |由于操作冲突,无法完成请求

kafka-connect JDBC PostgreSQL Sink Connector 显式定义 PostgrSQL 模式(命名空间)

使用 Docker Compose 创建 Kafka-Connect 集群以供 ksqlDB 使用

使用本地 kafka-connect 集群连接远程数据库的连接超时