第二个和第三个分布式 Kafka 连接器工作人员无法正常工作

Posted

技术标签:

【中文标题】第二个和第三个分布式 Kafka 连接器工作人员无法正常工作【英文标题】:Second and Third Distributed Kafka Connector workers failing to work correctly 【发布时间】:2017-05-06 08:18:06 【问题描述】:

使用 3 个 Kafka 集群和一个相同的 Zookeeper 集群,我启动了一个分布式连接器节点。这个节点成功运行了一个任务。然后我提出了第二个连接器,这似乎是因为任务中的一些代码确实运行了。然而,它似乎并没有保持活动状态(尽管没有抛出任何错误,但由于缺乏预期的活动而观察到不活动状态,而第一个连接器继续正常运行)。当我在每个连接器节点上调用 URL http://localhost:8083/connectors/mqtt/tasks 时,它告诉我连接器有一个任务。我希望这是两个任务,每个节点/工作者一个。 (目前工作人员配置为tasks.max = 1,但我也尝试将其设置为 3。

当我尝试调出第三个连接器时,出现错误:

"POST /connectors HTTP/1.1" 500 90  5 
(org.apache.kafka.connect.runtime.rest.RestServer:60)

ERROR IO error forwarding REST request: 
(org.apache.kafka.connect.runtime.rest.RestServer:241) 
java.net.ConnectException: Connection refused

尝试从 shell 再次调用连接器 POST 方法会返回错误:

 "error_code":500,"message":"IO Error trying to forward REST request:
 Connection refused"

我还尝试升级到今天发布的 Apache Kafka 0.10.1.1。我仍然看到问题。每个连接器都在由单个映像定义的隔离 Docker 容器上运行。它们应该是相同的。

问题可能是我试图在每个工作人员上运行对http://localhost:8083/connectors 的 POST 请求,而我只需要在单个工作人员上运行一次,然后该连接器的任务将自动分配给另一个工作人员工作人员。如果是这种情况,我该如何分配任务?我目前将最大值设置为三个,但似乎只有一个在单个工作器上运行。

更新

我最终使用 Yuri 建议的基本相同方法运行了一些东西。我给每个工作人员一个唯一的组 ID,然后给每个连接器任务赋予相同的名称。这允许三个连接器及其单个任务共享一个偏移量,因此在接收器连接器的情况下,它们从 Kafka 消费的消息不会重复。它们基本上作为独立连接器运行,因为工作人员具有不同的组 ID,因此不会相互通信。

如果连接器工作人员具有相同的组 ID,则您不能添加多个具有相同名称的连接器。如果您为连接器指定不同的名称,它们将具有不同的偏移量并使用重复的消息。如果您在同一个组中有三个工作人员,一个连接器和三个任务,理论上您将有一个理想的情况,其中任务共享一个偏移量并且工作人员确保任务始终运行且分布良好(每个任务消耗一个唯一的集合的分区)。在实践中,连接器框架不会创建多个任务,即使 tasks.max 设置为 3 并且主题任务正在使用时有 25 个分区。

如果有人知道我为什么会看到这种行为,请告诉我。

【问题讨论】:

【参考方案1】:

我遇到过和你一样的问题。

    Task.max 为主题配置,分布式工作人员自动决定哪些节点处理主题。因此,如果您在集群中有 3 个工作人员并且您的主题配置显示 task.max=2,那么 3 个工作人员中只有 2 个将处理该主题。理论上,如果其中一名工人失败,第三名应该接手工作。但是.. 分布式连接器被证明非常不可靠:一旦添加\删除一些节点,集群就会崩溃,所有工作人员什么都不做,只是试图选择领导者并失败了。修复的唯一方法是重新启动整个集群,最好是同时重新启动所有工作人员。

我选择了另一种方式 - 我使用了独立的 worker,它对我来说就像一个魅力,因为负载分配是在 Kafka 客户端级别实现的,一旦某个 worker 掉线,集群会自动重新平衡,并且客户端会连接到未占用的主题。

PS。也许它对你也有用。 Confluent 连接器不能容忍与主题架构不匹配的无效负载。一旦连接器收到一些无效消息,它就会默默地死掉。找出答案的唯一方法是分析指标。

【讨论】:

【参考方案2】:

我正在发布一个老问题的答案,因为 Kafka Connect has moved on a lot in three years。

在最新版本 (2.3.1) 中,incremental rebalancing 极大地改进了 Kafka Connect 的行为。

另外值得注意的是,在配置Kafka Connect的时候rest.advertised.host.name一定要设置正确,好像不是你会看到错误包括引用的那个

"error_code":500,"message":"IO Error trying to forward REST request: Connection refused"

更多详情请见this post。

【讨论】:

以上是关于第二个和第三个分布式 Kafka 连接器工作人员无法正常工作的主要内容,如果未能解决你的问题,请参考以下文章

XmlTextReader 忽略第二个和第三个 Profile 元素

如何使用选择器获取第二个和第三个元素?

在第二个和第三个连字符之间提取 TextString

单击预览后,如何在我的 React 应用程序上显示第二个和第三个按钮?

如何使用CHARINDEX和SUBSTRING在第二个和第三个“_”之间提取特定文本?

如何在不同的列中选择第一个、第二个和第三个值 - Ms Access