使用 Ctrl+C 停止 Debezium 连接器并再次重新启动连接器后连接器不起作用
Posted
技术标签:
【中文标题】使用 Ctrl+C 停止 Debezium 连接器并再次重新启动连接器后连接器不起作用【英文标题】:The connector does not work after stopping the Debezium Connector with Ctrl+C and restart the connector again 【发布时间】:2021-05-14 22:25:16 【问题描述】:我使用 Debezium mysql 连接器从我的数据库中捕获更改数据。它以前可以很好地工作。但是,现在,在我停止连接器并重新启动后,连接器无法正常工作。我今天遇到了这个问题,我相信它昨天运行良好。有人可以帮忙吗?太感谢了! 这些是错误消息。
[2021-02-11 01:19:13,581] INFO Kafka version: 6.0.1-ccs (org.apache.kafka.common.utils.AppInfoParser:117)
[2021-02-11 01:19:13,581] INFO Kafka commitId: 9c1fbb3db1e0d69d (org.apache.kafka.common.utils.AppInfoParser:118)
[2021-02-11 01:19:13,581] INFO Kafka startTimeMs: 1613027953581 (org.apache.kafka.common.utils.AppInfoParser:119)
[2021-02-11 01:19:13,582] INFO WorkerSourceTaskid=test-location-connector-0 Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:481)
[2021-02-11 01:19:13,582] INFO WorkerSourceTaskid=test-location-connector-0 flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:498)
[2021-02-11 01:19:13,582] ERROR WorkerSourceTaskid=test-location-connector-0 Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:187)
java.util.concurrent.RejectedExecutionException: Task io.debezium.relational.history.KafkaDatabaseHistory$$Lambda$732/0x000000080062a840@5d307667 rejected from java.util.concurrent.ThreadPoolExecutor@4d465813[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1]
at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2055)
at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:825)
at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1355)
at java.base/java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:687)
at io.debezium.relational.history.KafkaDatabaseHistory.checkTopicSettings(KafkaDatabaseHistory.java:382)
at io.debezium.relational.history.KafkaDatabaseHistory.exists(KafkaDatabaseHistory.java:362)
at io.debezium.relational.HistorizedRelationalDatabaseSchema.recover(HistorizedRelationalDatabaseSchema.java:45)
at io.debezium.connector.mysql.MySqlConnectorTask.validateAndLoadDatabaseHistory(MySqlConnectorTask.java:305)
at io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:92)
at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:106)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:232)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
[2021-02-11 01:19:13,583] ERROR WorkerSourceTaskid=test-location-connector-0 Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:188)
[2021-02-11 01:19:13,583] INFO Stopping down connector (io.debezium.connector.common.BaseSourceTask:192)
[2021-02-11 01:19:13,584] INFO Connection gracefully closed (io.debezium.jdbc.JdbcConnection:946)
[2021-02-11 01:19:13,585] INFO [Producer clientId=MysqlTest2-dbhistory] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. (org.apache.kafka.clients.producer.KafkaProducer:1189)
[2021-02-11 01:19:13,586] INFO [Producer clientId=connector-producer-test-location-connector-0] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:1189)
[2021-02-11 01:19:23,410] INFO WorkerSourceTaskid=test-location-connector-0 Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:481)
[2021-02-11 01:19:23,411] INFO WorkerSourceTaskid=test-location-connector-0 flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:498)
[2021-02-11 01:19:33,416] INFO WorkerSourceTaskid=test-location-connector-0 Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:481)
【问题讨论】:
你是如何重启连接器的? 可能 ctrl+c 不会断开连接器与 Debezium 服务的连接。您应该尝试删除连接器(curl -X DELETE "/connect-standalone /worker.properties /mysql-connector.properties
,(连接器插件路径设置在worker.properties中)我用Crtl + C停止它,我用相同的命令重新启动它。当您谈论 这是目前正在开发的 Debezium 1.5 版本的问题。在我切换回 1.4 后,一切正常。
【讨论】:
【参考方案2】:即使使用 Debezium 1.4 也有同样的问题。
对我有用的解决方法是将连接器 .properties 文件的 name
更改为其他内容。
现在,再次运行独立连接命令。它应该可以工作。
如何干净地停止连接器
每次都更改名称很烦人,但有一种方法可以避免这种情况。
按照 Vitor 的评论,您必须通过 Kafka Connect REST interface 连接器 DELETE
before 执行 Ctrl+C。我假设你在localhost:8083
(默认)上独立运行。
首先检查 Connect REST 接口是否工作。我会用HTTPie:
http GET localhost:8083/connectors
如果它正常工作,您应该在列表中获得连接器名称列表,例如
[
"testconnector"
]
然后您应该删除此连接器。
http DELETE localhost:8083/connectors/testconnector
在这之后,你终于可以Ctrl+C了。 您可能需要重新启动 Kafka 和 Zookeeper 进程,然后才能再次尝试运行 Connect。
【讨论】:
以上是关于使用 Ctrl+C 停止 Debezium 连接器并再次重新启动连接器后连接器不起作用的主要内容,如果未能解决你的问题,请参考以下文章
MySQL 的 Debezium 连接器。缺少数据库历史主题
kafka connect debezium mongodb连接器无法加载
Debezium 有没有办法停止数据序列化?试图从源代码中获取值
创建 mongo debezium 连接器时出现身份验证错误