debezium - 更改主题名称会导致错误跨数据库引用
Posted
技术标签:
【中文标题】debezium - 更改主题名称会导致错误跨数据库引用【英文标题】:debezium - change of topic name gives the error cross-database references 【发布时间】:2019-06-22 10:15:08 【问题描述】:我正在使用这个debezium-examples
source.json
"name": "inventory-connector",
"config":
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.whitelist": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3"
jdbc-sink.json
"name": "jdbc-sink",
"config":
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "customers",
"connection.url": "jdbc:postgresql://postgres:5432/inventory?user=postgresuser&password=postgrespw",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"auto.create": "true",
"insert.mode": "upsert",
"pk.fields": "id",
"pk.mode": "record_value"
我已经运行了这个示例,它工作正常。但是当我按照以下场景中的讨论进行了一些更改时。它给了我“跨数据库引用”错误。
情景
我已从源中删除这些属性
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3"
现在它在kafka中创建主题如下
dbServer1.inventory.products
dbserver1.inventory.products_on_hand
dbserver1.inventory.customers
dbserver1.inventory.orders
当我在 jdbc-sink 中指定 topic= dbserver1.inventory.customers
时,它给了我以下 exception
ERROR: cross-database references are not implemented:
"dbserver1.inventory.customers" at character 14
postgres_1 | STATEMENT: CREATE TABLE "dbserver1"."inventory"."customers" (
postgres_1 | "last_name" TEXT NOT NULL,
postgres_1 | "id" INT NOT NULL,
postgres_1 | "first_name" TEXT NOT NULL,
postgres_1 | "email" TEXT NOT NULL,
postgres_1 | PRIMARY KEY("id"))
connect_1 | 2019-01-29 09:39:18,931 WARN || Create failed, will attempt amend if table already exists [io.confluent.connect.jdbc.sink.DbStructure]
connect_1 | org.postgresql.util.PSQLException: ERROR: cross-database references are not implemented: "dbserver1.inventory.customers"
connect_1 | Position: 14
注意:它不是重复的,因为我也发布了其他问题,涵盖了不同的场景
【问题讨论】:
debezium confluent - change of transforms.route.replacement gives the SinkRecordField error的可能重复 它不重复,因为我也发布了其他问题,涵盖了不同的场景 我不知道 Postgres,但是否支持(name).(name).(name)
格式?一个是数据库,另一个是表……第三个是什么?
【参考方案1】:
table.name.format
sink 属性为我解决了这个问题。它允许您覆盖目标表名称。见https://docs.confluent.io/3.1.1/connect/connect-jdbc/docs/sink_config_options.html
【讨论】:
【参考方案2】:更改库存 -> dbserver1
(databasename).(schemaname).(tablename)
"name": "jdbc-sink",
"config":
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "customers",
"connection.url": "jdbc:postgresql://postgres:5432/dbserver1?user=postgresuser&password=postgrespw",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"auto.create": "true",
"insert.mode": "upsert",
"pk.fields": "id",
"pk.mode": "record_value"
【讨论】:
以上是关于debezium - 更改主题名称会导致错误跨数据库引用的主要内容,如果未能解决你的问题,请参考以下文章
多表 Debezium 连接器是不是保证跨更改跟踪表的排序?
Kafka Connect:使用 debezium 从 Postgres 流式传输更改到主题