复制槽已经存在

Posted

技术标签:

【中文标题】复制槽已经存在【英文标题】:replication slot already exists 【发布时间】:2018-11-17 16:12:25 【问题描述】:

每当我重新启动 debezium kafka-connect 容器或部署另一个实例时,我都会收到以下错误:

    io.debezium.jdbc.JdbcConnectionException: ERROR: replication slot "debezium" already exists
    at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.initReplicationSlot(PostgresReplicationConnection.java:136)
    at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.<init>(PostgresReplicationConnection.java:79)
    at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.<init>(PostgresReplicationConnection.java:38)
    at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$ReplicationConnectionBuilder.build(PostgresReplicationConnection.java:349)
    at io.debezium.connector.postgresql.PostgresTaskContext.createReplicationConnection(PostgresTaskContext.java:80)
    at io.debezium.connector.postgresql.RecordsStreamProducer.<init>(RecordsStreamProducer.java:75)
    at io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:112)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:157)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.postgresql.util.PSQLException: ERROR: replication slot "debezium" already exists
    at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2412)
    at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2125)
    at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:297)
    at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:428)
    at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:354)
    at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:301)
    at org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:287)
    at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:264)
    at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:260)
    at org.postgresql.replication.fluent.logical.LogicalCreateSlotBuilder.make(LogicalCreateSlotBuilder.java:48)
    at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.initReplicationSlot(PostgresReplicationConnection.java:102)
    ... 14 more

我正在使用这张图片:https://github.com/debezium/docker-images/tree/master/connect/0.8

并且有这样的配置:

      
   "name":"record-loader-connector",
   "config":  
      "connector.class":"io.debezium.connector.postgresql.PostgresConnector",
      "database.dbname":"record_loader?ssl",
      "database.user":"postgres",
      "database.hostname":"redacted",
      "database.history.kafka.bootstrap.servers":"redacted",
      "database.history.kafka.topic":"dbhistory.recordloader",
      "database.password":"redacted",
      "name":"record-loader-connector",
      "database.server.name":"recordLoaderDb",
      "database.port":"20023",
      "table.whitelist":".*sync"
   ,
   "tasks":[  
        
         "connector":"record-loader-connector",
         "task":0
      
   ],
   "type":"source"

我注意到这两个配置选项(slot.name 和 slot.drop_on_stop),但我不清楚是否/如何更改它们:

http://debezium.io/docs/connectors/postgresql/#connector-properties

【问题讨论】:

【参考方案1】:

如果您部署 Debezium Postgres 连接器的多个实例,您必须确保使用不同的复制槽名称。您可以在设置连接器时指定名称:


    "name": "inventory-connector",
    "config": 
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "tasks.max": "1",
        "database.hostname": "postgres",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "postgres",
        "database.dbname" : "postgres",
        "database.server.name": "dbserver1",
        "database.whitelist": "inventory",
        "database.history.kafka.bootstrap.servers": "kafka:9092",
        "database.history.kafka.topic": "schema-changes.inventory",
        "slot.name" : "my-slot-name"
    

我无法重现您在重新启动给定连接器实例时描述的问题。它应该检测到该插槽已经存在并重新使用该插槽(一个可能的原因可能是您也更改了逻辑解码插件(“decoderbufs”与“wal2json”)?如果您有一个复制器,您能在我们的bug tracker 中打开一个条目吗?

要继续,您可以手动删除 Postgres 中的插槽:

select pg_drop_replication_slot('debezium');

【讨论】:

感谢您的回复。因此,当我部署第二个 kafka-connect 实例时,它会获取我发布的现有配置,因此我没有机会为插槽名称指定不同的值。图像以分布式模式运行:github.com/debezium/docker-images/blob/master/connect-base/0.8/…,据我所知,我认为这意味着其他实例会协调。 WDYM 通过“接机”?您应该确保指定一个不同的名称,它将采用您发布到 Kafka Connect 的 JSON 的配置。 另外,当您说“重新启动给定的连接器实例”时,您是指从其他 api 中重新启动,还是实际上重新启动 docker 容器本身?对我来说,是后者导致了复制槽问题。 前者,但在重新启动容器时它也适用于我。如果您能提供重现问题的确切步骤,那就太好了。 re "pick up" - 我 docker 运行一个 kafka-connect 实例,然后发布连接器配置。然后我 docker 运行第二个 kafka-connect 实例,它立即遇到插槽问题,而我没有发布任何配置。

以上是关于复制槽已经存在的主要内容,如果未能解决你的问题,请参考以下文章

Postgres 使用 debezium 创建复制槽失败

AWS RDS PostgreSQL 错误“剩余的连接槽被保留用于非复制超级用户连接”

如何检查文件名是不是已经存在? [复制]

Heroku “psql: FATAL: 剩余的连接槽是为非复制超级用户连接保留的”

在类的实例化期间如何检查该类的对象是不是已经存在,如果存在,则指向已经存在的对象? [复制]

Django django.db.utils.OperationalError: FATAL: 剩余的连接槽是为非复制超级用户连接保留的