Kafka Connect:使用 debezium 从 Postgres 流式传输更改到主题

Posted

技术标签:

【中文标题】Kafka Connect:使用 debezium 从 Postgres 流式传输更改到主题【英文标题】:Kafka Connect: streaming changes from Postgres to topics using debezium 【发布时间】:2021-09-19 12:13:13 【问题描述】:

我对 Kafka 和 Kafka Connect 世界还很陌生。我正在尝试使用 Kafka(在 MSK 上)、Kafka Connect(使用 PostgreSQL 的 Debezium 连接器)和 RDS Postgres 实例来实现 CDC。 Kafka Connect 在我们部署在 AWS 的集群中的 K8 pod 中运行。

在深入了解使用的配置细节之前,我将尝试总结问题:

连接器启动后,会按预期向主题发送消息(快照) 一旦我们对表进行任何更改(创建、更新、删除),就不会向该主题发送任何消息。我们希望看到有关对表格所做更改的消息。

我的连接器配置如下:


    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.user": "root",
    "database.dbname": "insights",
    "slot.name": "cdc_organization",
    "tasks.max": "1",
    "column.blacklist": "password, access_key, reset_token",
    "database.server.name": "insights",
    "database.port": "5432",
    "plugin.name": "wal2json_rds_streaming",
    "schema.whitelist": "public",
    "table.whitelist": "public.kafka_connect_cdc_test",
    "key.converter.schemas.enable": "false",
    "database.hostname": "de-test-sre-12373.cbplqnioxomr.eu-west-1.rds.amazonaws.com",
    "database.password": "MYSECRETPWD",
    "value.converter.schemas.enable": "false",
    "name": "source-postgres",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "snapshot.mode": "initial"

我们为plugin.name 属性尝试了不同的配置:wal2josnwal2json_streamingwal2json_rds_streaming

连接器和数据库之间的连接没有问题,因为我们已经看到连接器一启动就有消息流过。

上述连接器是否存在配置问题,导致我们无法看到与主题中出现的新更改相关的消息?

谢谢

【问题讨论】:

日志中有用的东西? 不是真的,我们可以从 pod 日志中看到的唯一内容是:``[2021-07-09 09:18:06,632] INFO WorkerSourceTaskid=source-postgres-0 flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask) @IskuskovAlexander 我们开始认为这可能与为我们的 RDS Postgres 实例设置 WAL 的方式有关。但到目前为止,配置看起来还不错 你为什么使用wal2json插件?不是pgoutput? 因为我们在 postgres 实例上设置了 WAL。 【参考方案1】:

您的连接器配置看起来有点混乱。我对 Kafka 也很陌生,所以我真的不知道这个问题,但这是我的连接器配置,适合我。


   "name":"<connector_name>",
   "config": 
      "connector.class":"io.debezium.connector.postgresql.PostgresConnector",
      "database.server.name":"<server>",
      "database.port":"5432",
      "database.hostname":"<host>",
      "database.user":"<user>",
      "database.dbname":"<password>",
      "tasks.max":"1",
      "database.history.kafka.boostrap.servers":"localhost:9092",
      "database.history.kafka.topic":"<kafka_topic_name>",
      "plugin.name":"pgoutput",
      "include.schema.changes":"true"
   

如果此配置也不起作用,请尝试查找日志控制台;有时错误不是控制台的最后一次写入

【讨论】:

抱歉,这有什么令人困惑的地方?如果有帮助,我可以澄清要点 我的意思是有太多“无用”的值,试试这个示例,然后添加你需要的,表白名单,ecc...

以上是关于Kafka Connect:使用 debezium 从 Postgres 流式传输更改到主题的主要内容,如果未能解决你的问题,请参考以下文章

无法在启用 SSL 的 Kafka 集群中注册 Debezium (Kafka-Connect) 连接器

Kafka Connect 对 debezium 生成事件的日期处理

如何将 kafka-connect-jdbc-5.5.0.jar 添加到 Debezium/connect

Debezium 如何使用 Kafka Connect 正确注册 SqlServer 连接器 - 连接被拒绝

如何通过 Debezium Connect 反序列化来自 Kafka 消息流的几何字段?

如何使用debezium更改数据捕获在mysql中捕获数据并在kafka connect中使用jdbc sink?