如何在单个 Postgres 服务器中将 Embedded Debezium 用于多个数据库?
Posted
技术标签:
【中文标题】如何在单个 Postgres 服务器中将 Embedded Debezium 用于多个数据库?【英文标题】:How to use Embedded Debezium for multiple databases in a single Postgres server? 【发布时间】:2021-11-01 01:33:33 【问题描述】:假设我们有两个微服务:service_A
和 service_B
。
每个都有自己的数据库(分别为db_a
和db_b
),位于单个 Postgres 服务器实例中(这只是一个暂存环境,因此我们没有集群)。
还有另一个服务service_debezium
(带有Embedded Debezium v1.6.1Final)应该监听db_a
和db_b
的变化。所以基本上这个服务中配置了两个 Debezium 引擎。
但不知何故,service_debezium
无法同时收听db_a
和db_b
。由于某种原因,它只监听其中一个,并且没有错误日志。
此外,如果我配置 service_debezium
(即它的 Debezium 引擎)来监听 db_a
或 db_b
,它会按预期工作,所以我确定它们的配置属性是正确的,并且(当有只有一个引擎)一切正常。
-
那么为什么我们不能使用多个 Debezium 引擎来监听单个 Postgres 服务器中的多个数据库呢?我在这里错过了什么?
我认为的另一种选择是只使用一个 Debezium 引擎来侦听该 Postgres 服务器实例中的所有数据库,但显然它在其配置中需要
database.dbname
,所以我想首选的方法是为每个数据库定义一个新的 Debezium 引擎.对吗?
这是service_debezium
中的Debezium配置:
db_a
配置 bean:
@Bean
public io.debezium.config.Configuration dbAConnector() throws IOException
File offsetStorageTempFile = File.createTempFile("offsets_", ".dat");
File dbHistoryTempFile = File.createTempFile("dbhistory_", ".dat");
return io.debezium.config.Configuration.create()
.with("name", "db_a_connector")
.with("connector.class", "io.debezium.connector.postgresql.PostgresConnector")
.with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
.with("offset.storage.file.filename", offsetStorageTempFile.getAbsolutePath())
.with("offset.flush.interval.ms", "60000")
.with("database.hostname", "localhost")
.with("database.port", 5432)
.with("database.user", "postgres")
.with("database.password", "*****")
.with("database.dbname", "db_a")
.with("table.whitelist", "public.dummy_table,public.another_dummy_table")
.with("plugin.name", "pgoutput")
.with("slot.name", "db_a_connector")
.with("database.server.name", "db_a_server")
.with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
.with("database.history.file.filename", dbHistoryTempFile.getAbsolutePath())
.build();
db_b
配置 bean:
@Bean
public io.debezium.config.Configuration dbBConnector() throws IOException
File offsetStorageTempFile = File.createTempFile("offsets_", ".dat");
File dbHistoryTempFile = File.createTempFile("dbhistory_", ".dat");
return io.debezium.config.Configuration.create()
.with("name", "db_b_connector")
.with("connector.class", "io.debezium.connector.postgresql.PostgresConnector")
.with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
.with("offset.storage.file.filename", offsetStorageTempFile.getAbsolutePath())
.with("offset.flush.interval.ms", "60000")
.with("database.hostname", localhost)
.with("database.port", 5432)
.with("database.user", "postgres")
.with("database.password", "*****")
.with("database.dbname", "db_b")
.with("table.whitelist", "public.yet_another_table")
.with("plugin.name", "pgoutput")
.with("slot.name", "db_b_connector")
.with("database.server.name", "db_b_server")
.with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
.with("database.history.file.filename", dbHistoryTempFile.getAbsolutePath())
.build();
这是我在service_debezium
中创建 Debezium 引擎实例的方法:
db_a
听众:
@Component
public class DBAListener
public DBAListener(
@Qualifier("dbAConnector") Configuration connectorConfiguration /*, ... other services */)
this.debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
.using(connectorConfiguration.asProperties())
.notifying(this::handleChangeEvent)
.build();
// ...
// ...
和类似的db_b
听众:
@Component
public class DBBListener
public DBBListener(
@Qualifier("dbBConnector") Configuration connectorConfiguration /*, ... other services */)
this.debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
.using(connectorConfiguration.asProperties())
.notifying(this::handleChangeEvent)
.build();
// ...
// ...
postgresql.conf
:
# ...other conf
wal_level = logical # minimal, archive, hot_standby, or logical (change requires restart)
# max number of walsender processes (i.e. # of replica databases + # of Debezium engines) (change requires restart)
# Abrupt streaming client disconnection might cause an orphaned connection slot until a timeout is reached,
# so this parameter should be set slightly higher than the maximum number of expected clients
# so disconnected clients can immediately reconnect
max_wal_senders = 3
max_replication_slots = 3 # max number of replication slots (change requires restart)
提前致谢!
更新:根据@Yuri Niitsuma's answer更新了代码sn-ps给出一个完整的例子。
【问题讨论】:
【参考方案1】:当您创建 debezium 连接器时,它会创建一个默认名称为“debezium”的复制槽。然后您尝试创建另一个实例并尝试创建一个具有相同名称的复制槽,并且不能使用相同的复制槽同时使用两个实例,这将引发错误。这是不好的解释,但我会给出解决方案。
在每个连接器上添加此配置:
在 dbAConnector 上
.with("slot.name", "dbAConnector")
还有 dbBConnector
.with("slot.name", "dbBConnector")
您可以列出可用的复制槽:
SELECT * FROM pg_replication_slots;
您可以删除未使用的复制槽,例如默认名称“debezium”:
SELECT pg_drop_replication_slot('debezium');
因为当没有人使用这个插槽时会消耗磁盘。
【讨论】:
是的,就是这样。太感谢了!我将更新代码 sn-ps 以给出一个完整的示例。这是相关的doc,以防有人像我一样错过它。以上是关于如何在单个 Postgres 服务器中将 Embedded Debezium 用于多个数据库?的主要内容,如果未能解决你的问题,请参考以下文章
如何在python中将单引号存储到postgres [重复]
如何在 Postgres-XL 中将 SEQUENCE 设置为 DEFAULT?
如何在 Heroku 中将 Kafka 连接到 Postgres