需要 Debezium 连接器中用于 postgres 插入事件的主键信息

Posted

技术标签:

【中文标题】需要 Debezium 连接器中用于 postgres 插入事件的主键信息【英文标题】:Need primary key information in Debezium connector for postgres insert events 【发布时间】:2020-12-30 15:35:54 【问题描述】:

我将 Debezium 连接器用于带有 Kafka 连接的 postgres。

对于连接器写入 Kafka 的插入行事件,我需要有关哪些列是主键,哪些不是主键的信息。有没有办法做到这一点?

粘贴在 Kafka 中生成的示例插入事件:

  "schema": 
    "type": "struct",
    "fields": [
      
        "type": "struct",
        "fields": [
          
            "type": "int32",
            "optional": false,
            "field": "id"
          ,
          
            "type": "int32",
            "optional": false,
            "field": "bucket_type"
          
        ],
        "optional": true,
        "name": "postgresconfigdb.config.alert_configs.Value",
        "field": "before"
      ,
      
        "type": "struct",
        "fields": [
          
            "type": "int32",
            "optional": false,
            "field": "id"
          ,
          
            "type": "int32",
            "optional": false,
            "field": "bucket_type"
          
        ],
        "optional": true,
        "name": "postgresconfigdb.config.alert_configs.Value",
        "field": "after"
      ,
      
        "type": "struct",
        "fields": [
          
            "type": "string",
            "optional": false,
            "field": "version"
          ,
          
            "type": "string",
            "optional": false,
            "field": "connector"
          ,
          
            "type": "string",
            "optional": false,
            "field": "name"
          ,
          
            "type": "int64",
            "optional": false,
            "field": "ts_ms"
          ,
          
            "type": "string",
            "optional": true,
            "name": "io.debezium.data.Enum",
            "version": 1,
            "parameters": 
              "allowed": "true,last,false"
            ,
            "default": "false",
            "field": "snapshot"
          ,
          
            "type": "string",
            "optional": false,
            "field": "db"
          ,
          
            "type": "string",
            "optional": false,
            "field": "schema"
          ,
          
            "type": "string",
            "optional": false,
            "field": "table"
          ,
          
            "type": "int64",
            "optional": true,
            "field": "txId"
          ,
          
            "type": "int64",
            "optional": true,
            "field": "lsn"
          ,
          
            "type": "int64",
            "optional": true,
            "field": "xmin"
          
        ],
        "optional": false,
        "name": "io.debezium.connector.postgresql.Source",
        "field": "source"
      ,
      
        "type": "string",
        "optional": false,
        "field": "op"
      ,
      
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      ,
      
        "type": "struct",
        "fields": [
          
            "type": "string",
            "optional": false,
            "field": "id"
          ,
          
            "type": "int64",
            "optional": false,
            "field": "total_order"
          ,
          
            "type": "int64",
            "optional": false,
            "field": "data_collection_order"
          
        ],
        "optional": true,
        "field": "transaction"
      
    ],
    "optional": false,
    "name": "postgresconfigdb.config.alert_configs.Envelope"
  ,
  "payload": 
    "before": null,
    "after": 
      "id": 1100,
      "bucket_type": 10
    ,
    "source": 
      "version": "1.2.0.Final",
      "connector": "postgresql",
      "name": "postgresconfigdb",
      "ts_ms": 1599830887858,
      "snapshot": "true",
      "db": "configdb",
      "schema": "config",
      "table": "alert_configs",
      "txId": 2139888,
      "lsn": 379356048,
      "xmin": null
    ,
    "op": "r",
    "ts_ms": 1599830887859,
    "transaction": null
  

这里表格中的列是'id'和'bucket_type',它们的值在json-path payload->after中报告。

在特定于列的“可选”布尔字段中有关于不为空的列的信息,但是没有关于哪些列是主键的信息。 (本例中为id)

【问题讨论】:

手动可以吗?您可以使用 Kafka Connect SMT 将 PK 字段提取到消息密钥中,然后在消费者端使用它 【参考方案1】:

您可以找到有关 Kafka 密钥中哪些字段是 PK 列的信息。

【讨论】:

以上是关于需要 Debezium 连接器中用于 postgres 插入事件的主键信息的主要内容,如果未能解决你的问题,请参考以下文章

Kafka Connect Debezium postgres

table.include.list 配置参数在 Debezium Postgres 连接器中不起作用

Kafka 连接 Debezium Postgres Cloud SQL

debezium postgres 找不到`io/debezium/util/IoUtil`

如何在单个 Postgres 服务器中将 Embedded Debezium 用于多个数据库?

Kafka Connect:使用 debezium 从 Postgres 流式传输更改到主题