如何在单个 Postgres 服务器中将 Embedded Debezium 用于多个数据库?

Posted

技术标签:

【中文标题】如何在单个 Postgres 服务器中将 Embedded Debezium 用于多个数据库?【英文标题】:How to use Embedded Debezium for multiple databases in a single Postgres server? 【发布时间】:2021-11-01 01:33:33 【问题描述】:

假设我们有两个微服务:service_Aservice_B。 每个都有自己的数据库(分别为db_adb_b),位于单个 Postgres 服务器实例中(这只是一个暂存环境,因此我们没有集群)。

还有另一个服务service_debezium(带有Embedded Debezium v​​1.6.1Final)应该监听db_adb_b的变化。所以基本上这个服务中配置了两个 Debezium 引擎。

但不知何故,service_debezium 无法同时收听db_adb_b。由于某种原因,它只监听其中一个,并且没有错误日志。

此外,如果我配置 service_debezium(即它的 Debezium 引擎)来监听 db_adb_b,它会按预期工作,所以我确定它们的配置属性是正确的,并且(当有只有一个引擎)一切正常。

    那么为什么我们不能使用多个 Debezium 引擎来监听单个 Postgres 服务器中的多个数据库呢?我在这里错过了什么? 我认为的另一种选择是只使用一个 Debezium 引擎来侦听该 Postgres 服务器实例中的所有数据库,但显然它在其配置中需要database.dbname,所以我想首选的方法是为每个数据库定义一个新的 Debezium 引擎.对吗?

这是service_debezium中的Debezium配置:

db_a 配置 bean:
  @Bean
  public io.debezium.config.Configuration dbAConnector() throws IOException 
    File offsetStorageTempFile = File.createTempFile("offsets_", ".dat");
    File dbHistoryTempFile = File.createTempFile("dbhistory_", ".dat");
    return io.debezium.config.Configuration.create()
        .with("name", "db_a_connector")
        .with("connector.class", "io.debezium.connector.postgresql.PostgresConnector")
        .with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
        .with("offset.storage.file.filename", offsetStorageTempFile.getAbsolutePath())
        .with("offset.flush.interval.ms", "60000")
        .with("database.hostname", "localhost")
        .with("database.port", 5432)
        .with("database.user", "postgres")
        .with("database.password", "*****")
        .with("database.dbname", "db_a")
        .with("table.whitelist", "public.dummy_table,public.another_dummy_table")
        .with("plugin.name", "pgoutput")
        .with("slot.name", "db_a_connector")
        .with("database.server.name", "db_a_server")
        .with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
        .with("database.history.file.filename", dbHistoryTempFile.getAbsolutePath())
        .build();
  
db_b 配置 bean:
  @Bean
  public io.debezium.config.Configuration dbBConnector() throws IOException 
    File offsetStorageTempFile = File.createTempFile("offsets_", ".dat");
    File dbHistoryTempFile = File.createTempFile("dbhistory_", ".dat");
    return io.debezium.config.Configuration.create()
        .with("name", "db_b_connector")
        .with("connector.class", "io.debezium.connector.postgresql.PostgresConnector")
        .with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
        .with("offset.storage.file.filename", offsetStorageTempFile.getAbsolutePath())
        .with("offset.flush.interval.ms", "60000")
        .with("database.hostname", localhost)
        .with("database.port", 5432)
        .with("database.user", "postgres")
        .with("database.password", "*****")
        .with("database.dbname", "db_b")
        .with("table.whitelist", "public.yet_another_table")
        .with("plugin.name", "pgoutput")
        .with("slot.name", "db_b_connector")
        .with("database.server.name", "db_b_server")
        .with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
        .with("database.history.file.filename", dbHistoryTempFile.getAbsolutePath())
        .build();
  

这是我在service_debezium 中创建 Debezium 引擎实例的方法:

db_a听众:
@Component
public class DBAListener 

  public DBAListener(
      @Qualifier("dbAConnector") Configuration connectorConfiguration /*, ... other services */) 

    this.debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
        .using(connectorConfiguration.asProperties())
        .notifying(this::handleChangeEvent)
        .build();

    // ...
  

  // ...


和类似的db_b听众:
@Component
public class DBBListener 

  public DBBListener(
      @Qualifier("dbBConnector") Configuration connectorConfiguration /*, ... other services */) 

    this.debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
        .using(connectorConfiguration.asProperties())
        .notifying(this::handleChangeEvent)
        .build();

    // ...
  

  // ...


postgresql.conf:
# ...other conf

wal_level = logical             # minimal, archive, hot_standby, or logical (change requires restart)
# max number of walsender processes (i.e. # of replica databases + # of Debezium engines) (change requires restart)
# Abrupt streaming client disconnection might cause an orphaned connection slot until a timeout is reached, 
# so this parameter should be set slightly higher than the maximum number of expected clients 
# so disconnected clients can immediately reconnect
max_wal_senders = 3
max_replication_slots = 3       # max number of replication slots (change requires restart)

提前致谢!

更新:根据@Yuri Niitsuma's answer更新了代码sn-ps给出一个完整的例子。

【问题讨论】:

【参考方案1】:

当您创建 debezium 连接器时,它会创建一个默认名称为“debezium”的复制槽。然后您尝试创建另一个实例并尝试创建一个具有相同名称的复制槽,并且不能使用相同的复制槽同时使用两个实例,这将引发错误。这是不好的解释,但我会给出解决方案。

在每个连接器上添加此配置:

在 dbAConnector 上

.with("slot.name", "dbAConnector")

还有 dbBConnector

.with("slot.name", "dbBConnector")

您可以列出可用的复制槽:

SELECT * FROM pg_replication_slots;

您可以删除未使用的复制槽,例如默认名称“debezium”:

SELECT pg_drop_replication_slot('debezium');

因为当没有人使用这个插槽时会消耗磁盘。

【讨论】:

是的,就是这样。太感谢了!我将更新代码 sn-ps 以给出一个完整的示例。这是相关的doc,以防有人像我一样错过它。

以上是关于如何在单个 Postgres 服务器中将 Embedded Debezium 用于多个数据库?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Postgres 中将一个类型拆分为多个列?

如何在python中将单引号存储到postgres [重复]

如何在 Postgres-XL 中将 SEQUENCE 设置为 DEFAULT?

如何在 Heroku 中将 Kafka 连接到 Postgres

如何使用 Slick 在 Postgres 中将 json 对象插入到 jsonb 类型的列中

如何使用复制命令在 postgres 中将数据从一个表复制到另一个表