MySQL使用Debezium更改为Kafka-仅捕获DDL stmts
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MySQL使用Debezium更改为Kafka-仅捕获DDL stmts相关的知识,希望对你有一定的参考价值。
为CDC配置了mysql-Debezium。它正在捕获DDL更改(如创建/删除表),但没有捕获DML事件。使用MySQL 8.0.11和Embedded debezium 0.8.3.Final。
[创建表时,没有在MySQL服务器中完成其他配置。
使用以下代码创建配置bean
@Bean
public io.debezium.config.Configuration customerConnector()
return io.debezium.config.Configuration.create()
.with(EmbeddedEngine.CONNECTOR_CLASS, "io.debezium.connector.mysql.MySqlConnector")
.with(EmbeddedEngine.OFFSET_STORAGE, "org.apache.kafka.connect.storage.FileOffsetBackingStore")
.with(EmbeddedEngine.OFFSET_STORAGE_FILE_FILENAME, "path-to-file")
.with("offset.flush.interval.ms", 60000)
.with(EmbeddedEngine.ENGINE_NAME, "customer-mysql-connector")
.with(MySqlConnectorConfig.SERVER_NAME, databaseServer)
.with(MySqlConnectorConfig.HOSTNAME, databaseServer)
.with(MySqlConnectorConfig.PORT, databasePort)
.with(MySqlConnectorConfig.USER, databaseUser)
.with(MySqlConnectorConfig.PASSWORD, databasePassword)
.with(MySqlConnectorConfig.DATABASE_WHITELIST, databaseSchemaName)
.with(MySqlConnectorConfig.TABLE_WHITELIST, databaseTable)
.with(MySqlConnectorConfig.DATABASE_HISTORY,
MemoryDatabaseHistory.class.getName()).build();
下面是作为Springboot应用程序启动时的日志
2020-05-29 21:24:28.028 INFO 5576 --- [pool-1-thread-1] i.d.connector.mysql.MySqlConnectorTask : MySQL has the binlog file 'binlog.000009' required by the connector
2020-05-29 21:24:28.072 INFO 5576 --- [pool-1-thread-1] io.debezium.util.Threads : Requested thread factory for connector MySqlConnector, id = localhost named = binlog-client
2020-05-29 21:24:28.074 INFO 5576 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2020-05-29 21:24:28.074 INFO 5576 --- [pool-1-thread-1] io.debezium.util.Threads : Creating thread debezium-mysqlconnector-localhost-binlog-client
2020-05-29 21:24:28.090 INFO 5576 --- [-localhost:3306] io.debezium.util.Threads : Creating thread debezium-mysqlconnector-localhost-binlog-client
2020-05-29 21:24:28.121 INFO 5576 --- [-localhost:3306] c.g.shyiko.mysql.binlog.BinaryLogClient : Connected to localhost:3306 at binlog.000009/3786 (sid:6293, cid:36)
2020-05-29 21:24:28.121 INFO 5576 --- [-localhost:3306] i.debezium.connector.mysql.BinlogReader : Connected to MySQL binlog at localhost:3306, starting at binlog file 'binlog.000009', pos=3786, skipping 8 events plus 0 rows
2020-05-29 21:24:28.121 INFO 5576 --- [-localhost:3306] io.debezium.util.Threads : Creating thread debezium-mysqlconnector-localhost-binlog-client
2020-05-29 21:24:28.183 INFO 5576 --- [ main] d.s.w.p.DocumentationPluginsBootstrapper : Context refreshed
2020-05-29 21:24:28.199 INFO 5576 --- [ main] d.s.w.p.DocumentationPluginsBootstrapper : Found 1 custom documentation plugin(s)
2020-05-29 21:24:28.199 INFO 5576 --- [ main] s.d.s.w.s.ApiListingReferenceScanner : Scanning for api listing references
任何线索?谢谢!
答案
[table.whitelist
应设置为<schema>.<table>
,因此在您的情况下为source.customer
以上是关于MySQL使用Debezium更改为Kafka-仅捕获DDL stmts的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 debezium mysql 连接器 kafka 进行初始快照加载?
如何使用debezium更改数据捕获在mysql中捕获数据并在kafka connect中使用jdbc sink?
干货 | Debezium实现Mysql到Elasticsearch高效实时同步