Kafka Connect JDBC Sink - 一个接收器配置中每个主题(表)的 pk.fields

Posted

技术标签:

【中文标题】Kafka Connect JDBC Sink - 一个接收器配置中每个主题(表)的 pk.fields【英文标题】:Kafka Connect JDBC Sink - pk.fields for each topic (table) in one sink configuration 【发布时间】:2019-06-23 14:54:56 【问题描述】:

关于这个例子debezium-example

我有多个具有不同主键的主题

item (pk : id)
itemDetail (pk :id, itemId)
itemLocation (pk :id, itemId)

jdbc-sink.source


"name": "jdbc-sink",
"config": 
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "topics": "item,itemDetail,itemLocation",
    "connection.url": "jdbc:postgresql://postgres:5432/inventory?user=postgresuser&password=postgrespw",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
    "auto.create": "true",
    "insert.mode": "upsert",
    "pk.fields": "id",
    "pk.mode": "record_value"


我们如何为每个主题(表)指定“pk.fields”?

【问题讨论】:

【参考方案1】:

我认为每个主题的 PK 映射没有这样的配置。

您需要为每个主题进行多个配置


"name": "jdbc-sink-item",
"config": 
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "topics": "item",
    "pk.fields": "id",


"name": "jdbc-sink-itemDetail",
"config": 
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "topics": "itemDetail",
    "pk.fields": "id,itemId",

等等

【讨论】:

嗨,我们有不止一个主题,并提供了接收器连接器作为逗号列表。现在质疑我们是否有每个表不同的主键然后“pk.fields”:“id,itemId”,可以为我们工作吗?如果我们每个表都有不同的主键?

以上是关于Kafka Connect JDBC Sink - 一个接收器配置中每个主题(表)的 pk.fields的主要内容,如果未能解决你的问题,请参考以下文章

使用 Kafka Connect API JDBC Sink 连接器示例到 Oracle 数据库的 Kafka 主题

Kafka Connect JDBC Sink 连接器:如何删除没有 NULL 值的记录?

kafka jdbc sink连接器抛出org.apache.kafka.connect.errors.DataException(结构模式的字段名称未正确指定)插入PG表

Kafka Connect JDBC Sink - 一个接收器配置中每个主题(表)的 pk.fields

Kafka Connect - JSON 转换器 - JDBC Sink 连接器 - 列类型 JSON

如何使用debezium更改数据捕获在mysql中捕获数据并在kafka connect中使用jdbc sink?