Debezium Postgres Kafka 没有创建主题

Posted

技术标签:

【中文标题】Debezium Postgres Kafka 没有创建主题【英文标题】:Debezium Postgres Kafka not creating topic 【发布时间】:2021-10-27 06:58:30 【问题描述】:

我是 kafka 的新手,我正在使用 debezium kafka 来跟踪我的 postgrest 表中的变化。以下是我的 docker-complse.yml

version: '3.8'

volumes:
  shared-workspace:
    name: "hadoop-distributed-file-system"
    driver: local
    
services: 
  postgres:
    restart: always
    image: debezium/postgres
    container_name: postgres
    ports:
      - "5432:5432"
    environment:
      - POSTGRES_PASSWORD=mosip123
      - POSTGRES_DB=anonprofile
    # to activate WAL 
    # command: postgres -c wal_level=logical -c archive_mode=on -c max_wal_senders=5
    
    volumes:
      - shared-workspace:/opt/workspace
      - ./PostgresDB:/docker-entrypoint-initdb.d/
  
  zookeeper:
    image: debezium/zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
      - "2888:2888"
      - "3888:3888"
    container_name: zookeeper
    volumes:
      - shared-workspace:/opt/workspace

  kafka:
    image: debezium/kafka
    container_name: kafka
    ports:
      - "9092:9092"
      - "29092:29092"
    depends_on:
      - zookeeper
    environment:
      - ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ADVERTISED_LISTENERS=LISTENER_EXT://localhost:29092,LISTENER_INT://kafka:9092
      - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=LISTENER_INT:PLAINTEXT,LISTENER_EXT:PLAINTEXT
      - KAFKA_LISTENERS=LISTENER_INT://0.0.0.0:9092,LISTENER_EXT://0.0.0.0:29092
      - KAFKA_INTER_BROKER_LISTENER_NAME=LISTENER_INT
    volumes:
      - shared-workspace:/opt/workspace    
  
  connect:
    image: debezium/connect
    container_name: connect
    ports:
      - "8083:8083"
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=1
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_connect_statuses
    depends_on:
      - zookeeper
      - kafka
    volumes:
      - shared-workspace:/opt/workspace

postgres 容器内的 shell 脚本。 请注意数据类型是JSON,如果这是错误的来源?

#!/bin/bash
apt-get update && apt-get install postgresql-13-pgoutput
psql -U postgres -d anonprofile <<-EOSQL
    CREATE TABLE IF NOT EXISTS anon_profiles (id SERIAL PRIMARY KEY, profiledata JSON );
    ALTER TABLE anon_profiles REPLICA IDENTITY USING INDEX anon_profiles_pkey;
    ALTER SYSTEM SET wal_level to 'logical';
EOSQL

连接器 json 文件

 "name": "anonprofile-connector",
  "config": 
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "user",
    "database.password": "mosip123",
    "database.dbname" : "anonprofile",
    "database.server.name": "MOSIP",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "false",
    "database.history.kafka.bootstrap.servers": "kafka:29092",
    "database.history.kafka.topic": "schema-changes.anon_profiles",
    "plugin.name": "pgoutput",
    "publication.autocreate.mode": "all_tables",
    "publication.name": "my_publication",
    "snapshot.mode": "always"
  

设置完所有内容后,我没有发现任何错误,但通过检查主题列表,没有为上述 postgres 连接创建任何主题。我错过了什么吗?

主题列表

$docker exec -it \
  $(docker ps | grep kafka | awk ' print $1 ') \
  /kafka/bin/kafka-topics.sh \
    --bootstrap-server localhost:9092 --list
__consumer_offsets
my_connect_configs
my_connect_offsets
my_connect_statuses

【问题讨论】:

您在哪里搜索错误?您是否禁用了自动创建主题?在启动 Debezium 之前创建主题会发生什么 - 它真的运行了吗? @OneCricketeer 我没有手动禁用自动主题创建。但是,如果它被禁用,我该如何启用它?不,手动创建主题不会运行。它显示在列表中,但不接受该消息(我的意思是我无法看到其中的数据库更改)。 我是kafka的新手,根据网上提供的资料,我期待主题是自动创建的。 @OneCricketeer 非常感谢,我已经解决了这个问题。我正在做的是在创建 postgres 容器时,我已经通过入口点 shell 脚本创建表(在我运行 CURL 命令连接 kafka 和 postgres 之前),因此没有创建主题。 @OneCricketeer 找到了它,并不是因为朋友帮助了我,而是把它放在这里以防有人遇到问题。 database.user 应该是超级用户(postgres)。之前我的其他用户没有创建复制槽的权限。 【参考方案1】:

问题在于database.user,用户需要有权创建复制槽。将其设置为postgres 就可以了。或者授予用户所需的权限(我想这将使其成为超级用户)。

【讨论】:

以上是关于Debezium Postgres Kafka 没有创建主题的主要内容,如果未能解决你的问题,请参考以下文章

Kafka 连接 Debezium Postgres Cloud SQL

Debezium Postgres Kafka 没有创建主题

需要 Debezium 连接器中用于 postgres 插入事件的主键信息

使用debeziumkafka-connect将postgres数据实时同步到kafka中

使用debeziumkafka-connect将postgres数据实时同步到kafka中,表topic重新路由

Debezium 导致 Postgres 耗尽 RDS 上的磁盘空间