使用 Ctrl+C 停止 Debezium 连接器并再次重新启动连接器后连接器不起作用

Posted

技术标签:

【中文标题】使用 Ctrl+C 停止 Debezium 连接器并再次重新启动连接器后连接器不起作用【英文标题】:The connector does not work after stopping the Debezium Connector with Ctrl+C and restart the connector again 【发布时间】:2021-05-14 22:25:16 【问题描述】:

我使用 Debezium mysql 连接器从我的数据库中捕获更改数据。它以前可以很好地工作。但是,现在,在我停止连接器并重新启动后,连接器无法正常工作。我今天遇到了这个问题,我相信它昨天运行良好。有人可以帮忙吗?太感谢了! 这些是错误消息。

[2021-02-11 01:19:13,581] INFO Kafka version: 6.0.1-ccs (org.apache.kafka.common.utils.AppInfoParser:117)
[2021-02-11 01:19:13,581] INFO Kafka commitId: 9c1fbb3db1e0d69d (org.apache.kafka.common.utils.AppInfoParser:118)
[2021-02-11 01:19:13,581] INFO Kafka startTimeMs: 1613027953581 (org.apache.kafka.common.utils.AppInfoParser:119)
[2021-02-11 01:19:13,582] INFO WorkerSourceTaskid=test-location-connector-0 Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:481)
[2021-02-11 01:19:13,582] INFO WorkerSourceTaskid=test-location-connector-0 flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:498)
[2021-02-11 01:19:13,582] ERROR WorkerSourceTaskid=test-location-connector-0 Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:187)
java.util.concurrent.RejectedExecutionException: Task io.debezium.relational.history.KafkaDatabaseHistory$$Lambda$732/0x000000080062a840@5d307667 rejected from java.util.concurrent.ThreadPoolExecutor@4d465813[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1]
    at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2055)
    at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:825)
    at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1355)
    at java.base/java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:687)
    at io.debezium.relational.history.KafkaDatabaseHistory.checkTopicSettings(KafkaDatabaseHistory.java:382)
    at io.debezium.relational.history.KafkaDatabaseHistory.exists(KafkaDatabaseHistory.java:362)
    at io.debezium.relational.HistorizedRelationalDatabaseSchema.recover(HistorizedRelationalDatabaseSchema.java:45)
    at io.debezium.connector.mysql.MySqlConnectorTask.validateAndLoadDatabaseHistory(MySqlConnectorTask.java:305)
    at io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:92)
    at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:106)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:232)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
[2021-02-11 01:19:13,583] ERROR WorkerSourceTaskid=test-location-connector-0 Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:188)
[2021-02-11 01:19:13,583] INFO Stopping down connector (io.debezium.connector.common.BaseSourceTask:192)
[2021-02-11 01:19:13,584] INFO Connection gracefully closed (io.debezium.jdbc.JdbcConnection:946)
[2021-02-11 01:19:13,585] INFO [Producer clientId=MysqlTest2-dbhistory] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. (org.apache.kafka.clients.producer.KafkaProducer:1189)
[2021-02-11 01:19:13,586] INFO [Producer clientId=connector-producer-test-location-connector-0] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:1189)
[2021-02-11 01:19:23,410] INFO WorkerSourceTaskid=test-location-connector-0 Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:481)
[2021-02-11 01:19:23,411] INFO WorkerSourceTaskid=test-location-connector-0 flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:498)
[2021-02-11 01:19:33,416] INFO WorkerSourceTaskid=test-location-connector-0 Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:481)

【问题讨论】:

你是如何重启连接器的? 可能 ctrl+c 不会断开连接器与 Debezium 服务的连接。您应该尝试删除连接器(curl -X DELETE "/test-location-connector-0"),然后重新创建它。 第一次命令是/connect-standalone /worker.properties /mysql-connector.properties,(连接器插件路径设置在worker.properties中)我用Crtl + C停止它,我用相同的命令重新启动它。当您谈论 时,我只有一个 debezium-connector-mysql 文件夹,其中包含多个 jar 文件。有详细资料吗?非常感谢你们! 【参考方案1】:

这是目前正在开发的 Debezium 1.5 版本的问题。在我切换回 1.4 后,一切正常。

【讨论】:

【参考方案2】:

即使使用 Debezium 1.4 也有同样的问题。

对我有用的解决方法是将连接器 .properties 文件的 name 更改为其他内容

现在,再次运行独立连接命令。它应该可以工作。

如何干净地停止连接器

每次都更改名称很烦人,但有一种方法可以避免这种情况。 按照 Vitor 的评论,您必须通过 Kafka Connect REST interface 连接器 DELETE before 执行 Ctrl+C。我假设你在localhost:8083(默认)上独立运行。

首先检查 Connect REST 接口是否工作。我会用HTTPie:

http GET localhost:8083/connectors

如果它正常工作,您应该在列表中获得连接器名称列表,例如

[
    "testconnector"
]

然后您应该删除此连接器。

http DELETE localhost:8083/connectors/testconnector

在这之后,你终于可以Ctrl+C了。 您可能需要重新启动 Kafka 和 Zookeeper 进程,然后才能再次尝试运行 Connect。

【讨论】:

以上是关于使用 Ctrl+C 停止 Debezium 连接器并再次重新启动连接器后连接器不起作用的主要内容,如果未能解决你的问题,请参考以下文章

MySQL 的 Debezium 连接器。缺少数据库历史主题

kafka connect debezium mongodb连接器无法加载

Debezium 有没有办法停止数据序列化?试图从源代码中获取值

创建 mongo debezium 连接器时出现身份验证错误

SQL Server 的 Debezium 连接器,并希望在 debezium 层进行过滤

更改 Debezium 连接器使用的主题名称