Debezium - MySQL 连接器 - Kinesis - 服务未启动

Posted

技术标签:

【中文标题】Debezium - MySQL 连接器 - Kinesis - 服务未启动【英文标题】:Debezium - MySQL Connector - Kinesis - Service Not Starting 【发布时间】:2020-11-07 22:17:54 【问题描述】:

我正在尝试让 Debezium 将我的 CDC 事件发送到 Kinesis,服务似乎已启动,但出现错误,并且似乎没有向 Kinesis 发送任何内容。我在 Debezium 网站上遵循此设置指南:

https://debezium.io/documentation/reference/1.2/operations/debezium-server.html

这是我的配置文件:

debezium.sink.type=kinesis
debezium.sink.kinesis.region=us-east-1
debezium.source.connector.class=io.debezium.connector.mysql.MySqlConnector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=127.0.0.1
debezium.source.database.id=12345
debezium.source.database.port=3306
debezium.source.database.user=*****
debezium.source.database.password=******
debezium.source.database.dbname=inventory
debezium.source.database.server.name=127.0.0.1
debezium.source.schema.whitelist=inventory

当我使用./run.sh 运行服务器时,服务器似乎开始启动,然后出现错误:

2020-07-17 19:11:22,380 INFO  [io.deb.ser.BaseChangeConsumer] (main) Using 'io.debezium.server.BaseChangeConsumer$$Lambda$81/694452085@2abf4075' stream name mapper
2020-07-17 19:11:22,754 INFO  [io.deb.ser.kin.KinesisChangeConsumer] (main) Using default KinesisClient 'software.amazon.awssdk.services.kinesis.DefaultKinesisClient@5d5f10b2'
2020-07-17 19:11:22,755 INFO  [io.deb.ser.DebeziumServer] (main) Consumer 'io.debezium.server.kinesis.KinesisChangeConsumer' instantiated
2020-07-17 19:11:22,926 INFO  [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
    converter.type = key
    decimal.format = BASE64
    schemas.cache.size = 1000
    schemas.enable = true

2020-07-17 19:11:22,928 INFO  [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
    converter.type = value
    decimal.format = BASE64
    schemas.cache.size = 1000
    schemas.enable = false

2020-07-17 19:11:22,936 INFO  [io.deb.emb.EmbeddedEngine$EmbeddedConfig] (main) EmbeddedConfig values:
    access.control.allow.methods =
    access.control.allow.origin =
    admin.listeners = null
    bootstrap.servers = [localhost:9092]
    client.dns.lookup = default
    config.providers = []
    connector.client.config.override.policy = None
    header.converter = class org.apache.kafka.connect.storage.SimpleHeaderConverter
    internal.key.converter = class org.apache.kafka.connect.json.JsonConverter
    internal.value.converter = class org.apache.kafka.connect.json.JsonConverter
    key.converter = class org.apache.kafka.connect.json.JsonConverter
    listeners = null
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    offset.flush.interval.ms = 0
    offset.flush.timeout.ms = 5000
    offset.storage.file.filename = data/offsets.dat
    offset.storage.partitions = null
    offset.storage.replication.factor = null
    offset.storage.topic =
    plugin.path = null
    rest.advertised.host.name = null
    rest.advertised.listener = null
    rest.advertised.port = null
    rest.extension.classes = []
    rest.host.name = null
    rest.port = 8083
    ssl.client.auth = none
    task.shutdown.graceful.timeout.ms = 5000
    topic.tracking.allow.reset = true
    topic.tracking.enable = true
    value.converter = class org.apache.kafka.connect.json.JsonConverter

2020-07-17 19:11:22,937 INFO  [org.apa.kaf.con.run.WorkerConfig] (main) Worker configuration property 'internal.key.converter' is deprecated and may be removed in an upcoming release. The specified value 'org.apache.kafka.connect.json.JsonConverter' matches the default, so this property can be safely removed from the worker configuration.
2020-07-17 19:11:22,937 INFO  [org.apa.kaf.con.run.WorkerConfig] (main) Worker configuration property 'internal.value.converter' is deprecated and may be removed in an upcoming release. The specified value 'org.apache.kafka.connect.json.JsonConverter' matches the default, so this property can be safely removed from the worker configuration.
2020-07-17 19:11:22,939 INFO  [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
    converter.type = key
    decimal.format = BASE64
    schemas.cache.size = 1000
    schemas.enable = true

2020-07-17 19:11:22,940 INFO  [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
    converter.type = value
    decimal.format = BASE64
    schemas.cache.size = 1000
    schemas.enable = true

2020-07-17 19:11:22,942 INFO  [io.deb.ser.DebeziumServer] (main) Engine executor started
2020-07-17 19:11:22,948 INFO  [org.apa.kaf.con.sto.FileOffsetBackingStore] (pool-3-thread-1) Starting FileOffsetBackingStore with file data/offsets.dat
2020-07-17 19:11:22,989 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) Starting MySqlConnectorTask with configuration:
2020-07-17 19:11:22,990 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1)    connector.class = io.debezium.connector.mysql.MySqlConnector
2020-07-17 19:11:22,990 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1)    offset.flush.interval.ms = 0
2020-07-17 19:11:22,990 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1)    database.user = mysqluser
2020-07-17 19:11:22,990 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1)    database.dbname = inventory
2020-07-17 19:11:22,990 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1)    offset.storage.file.filename = data/offsets.dat
2020-07-17 19:11:22,990 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1)    database.hostname = 127.0.0.1
2020-07-17 19:11:22,990 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1)    database.id = 12345
2020-07-17 19:11:22,990 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1)    database.password = ********
2020-07-17 19:11:22,990 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1)    name = kinesis
2020-07-17 19:11:22,990 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1)    database.server.name = 127.0.0.1
2020-07-17 19:11:22,990 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1)    database.port = 3306
2020-07-17 19:11:22,991 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1)    schema.whitelist = inventory
2020-07-17 19:11:23,063 INFO  [io.quarkus] (main) debezium-server-dist 1.2.0.Final on JVM (powered by Quarkus 1.5.0.Final) started in 1.177s. Listening on: http://0.0.0.0:8080
2020-07-17 19:11:23,065 INFO  [io.quarkus] (main) Profile prod activated.
2020-07-17 19:11:23,065 INFO  [io.quarkus] (main) Installed features: [cdi, smallrye-health]
2020-07-17 19:11:23,276 ERROR [io.deb.rel.his.KafkaDatabaseHistory] (pool-3-thread-1) The 'database.history.kafka.topic' value is invalid: A value is required
2020-07-17 19:11:23,276 ERROR [io.deb.rel.his.KafkaDatabaseHistory] (pool-3-thread-1) The 'database.history.kafka.bootstrap.servers' value is invalid: A value is required**
2020-07-17 19:11:23,277 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) Stopping down connector
2020-07-17 19:11:23,277 INFO  [io.deb.con.mys.MySqlConnectorTask] (pool-3-thread-1) Stopping MySQL connector task
2020-07-17 19:11:23,278 INFO  [org.apa.kaf.con.sto.FileOffsetBackingStore] (pool-3-thread-1) Stopped FileOffsetBackingStore
2020-07-17 19:11:23,278 INFO  [io.deb.ser.ConnectorLifecycle] (pool-3-thread-1) Connector completed: success = 'false', message = 'Unable to initialize and start connector's task class 'io.debezium.connector.mysql.MySqlConnectorTask' with config: name=kinesis, connector.class=io.debezium.connector.mysql.MySqlConnector, database.id=12345, schema.whitelist=inventory, database.port=3306, database.user=username, database.hostname=127.0.0.1, offset.storage.file.filename=data/offsets.dat, database.password=********, offset.flush.interval.ms=0, database.server.name=127.0.0.1, database.dbname=inventory', error = '': org.apache.kafka.connect.errors.ConnectException: Error configuring an instance of KafkaDatabaseHistory; check the logs for details
    at io.debezium.relational.history.KafkaDatabaseHistory.configure(KafkaDatabaseHistory.java:180)
    at io.debezium.connector.mysql.MySqlSchema.<init>(MySqlSchema.java:139)
    at io.debezium.connector.mysql.MySqlTaskContext.<init>(MySqlTaskContext.java:86)
    at io.debezium.connector.mysql.MySqlTaskContext.<init>(MySqlTaskContext.java:52)
    at io.debezium.connector.mysql.MySqlConnectorTask.createAndStartTaskContext(MySqlConnectorTask.java:357)
    at io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:143)
    at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:101)
    at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:756)
    at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:170)
    at io.debezium.server.DebeziumServer.lambda$start$1(DebeziumServer.java:133)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

^C2020-07-18 00:00:44,245 INFO  [io.deb.ser.DebeziumServer] (main) Received request to stop the engine
2020-07-18 00:00:44,245 INFO  [io.deb.emb.EmbeddedEngine] (main) Stopping the embedded engine
2020-07-18 00:00:44,254 INFO  [io.quarkus] (main) debezium-server-dist stopped in 0.028s

您可能会在大约 3/4 的时候遇到与以下相关的两个错误: io.deb.rel.his.KafkaDatabaseHistory

使用 Kinesis 源,我不确定它为什么会抛出与 Kafka 相关的这些错误。有谁知道为什么会发生这种情况或解决方案?

【问题讨论】:

【参考方案1】:

您需要为mysql 连接器指定debezium.source.database.history 属性。它的默认值为io.debezium.relational.history.KafkaDatabaseHistory,因此对于非Kafka部署,请设置以下值之一:

io.debezium.relational.history.FileDatabaseHistory(连同debezium.source.database.history.file.filename 属性); io.debezium.relational.history.MemoryDatabaseHistory 用于测试环境。

【讨论】:

谢谢!做到了。作为后续,有没有办法将 Kinesis 连接指向我的 localhost 以使用我的 localstack 版本进行测试?我没有看到任何允许重定向运动的参数.. 不客气!如果我的回答对您有帮助,请接受并投票 我认为你不能这样做,但你可以尝试在 Debezium 聊天中询问 - gitter.im/debezium/user

以上是关于Debezium - MySQL 连接器 - Kinesis - 服务未启动的主要内容,如果未能解决你的问题,请参考以下文章

如何为 mysql 数据库创建多个 Debezium 连接器

如何将新表添加到 Debezium MySQL 连接器?

尝试运行连接器类“io.debezium.connector.mysql.MySqlConnector”时出错

MySQL 的 Debezium 连接器。缺少数据库历史主题

Debezium Mysql连接器解析器下的IncompatibleClassChangeError

Debezium MySQL错误:连接密码为空