如何从kafka主题为ksqldb创建主题

Posted

技术标签:

【中文标题】如何从kafka主题为ksqldb创建主题【英文标题】:how to create subject for ksqldb from kafka tapic 【发布时间】:2021-09-09 21:33:21 【问题描述】:

我使用 mysql 数据库。假设我有一张订单表。并使用 debezium mysql connect for Kafka,创建了订单主题。但我在 ksqldb 中创建流时遇到问题。

CREATE STREAM orders WITH (
    kafka_topic = 'myserver.mydatabase.orders',
    value_format = 'avro'
);

我的 docker-compose 文件看起来像这样

  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    container_name: zookeeper
    privileged: true
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:latest
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - '9092:9092'
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1


  schema-registry:
    image: confluentinc/cp-schema-registry:latest
    container_name: schema-registry
    depends_on:
      - kafka
      - zookeeper
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: "zookeeper:2181"
      SCHEMA_REGISTRY_HOST_NAME: schema-registry

  kafka-connect:
    hostname: kafka-connect
    image: confluentinc/cp-kafka-connect:latest
    container_name: kafka-connect
    ports:
      - 8083:8083
    depends_on:
      - schema-registry
    environment:
      CONNECT_BOOTSTRAP_SERVERS: kafka:9092
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: "quickstart-avro"
      CONNECT_CONFIG_STORAGE_TOPIC: "quickstart-avro-config"
      CONNECT_OFFSET_STORAGE_TOPIC: "quickstart-avro-offsets"
      CONNECT_STATUS_STORAGE_TOPIC: "quickstart-avro-status"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
      CONNECT_LOG4J_ROOT_LOGLEVEL: DEBUG
      CONNECT_PLUGIN_PATH: "/usr/share/java,/etc/kafka-connect/jars"
    volumes:
      - $PWD/kafka/jars:/etc/kafka-connect/jars

  ksqldb-server:
    image: confluentinc/ksqldb-server:latest
    hostname: ksqldb-server
    container_name: ksqldb-server
    depends_on:
      - kafka
    ports:
      - "8088:8088"
    environment:
      KSQL_LISTENERS: http://0.0.0.0:8088
      KSQL_BOOTSTRAP_SERVERS: "kafka:9092"
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
      KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"

  ksqldb-cli:
    image: confluentinc/ksqldb-cli:latest
    container_name: ksqldb-cli
    depends_on:
      - kafka
      - ksqldb-server
      - schema-registry
    entrypoint: /bin/sh
    tty: true

必须首先为此表创建主题。 avro、json有什么区别?

【问题讨论】:

【参考方案1】:

对 Kafka 使用 debezium mysql 连接

您可以设置使用AvroConverter,然后主题将自动创建

否则,您可以让 KSQL 使用VALUE_FORMAT=JSON,并且您需要手动指定所有字段名称。不清楚您要问的是什么区别(它们是不同的序列化格式),但从 KSQL 的角度来看,与 Avro 等其他格式相比,JSON 单独被视为纯文本(类似于 DELIMITED)并且需要解析架构+字段是已知的。

【讨论】:

【参考方案2】:

我解决了这个问题。使用此配置,可以在没有前、后状态的主题中发送mysql表。

CREATE SOURCE CONNECTOR final_connector WITH (
    'connector.class' = 'io.debezium.connector.mysql.MySqlConnector',
    'database.hostname' = 'mysql',
    'database.port' = '3306',
    'database.user' = 'root',
    'database.password' = 'mypassword',
    'database.allowPublicKeyRetrieval' = 'true',
    'database.server.id' = '184055',
    'database.server.name' = 'db',
    'database.whitelist' = 'mydb',
    'database.history.kafka.bootstrap.servers' = 'kafka:9092',
    'database.history.kafka.topic' = 'mydb',
    'table.whitelist' = 'mydb.user',
    'include.schema.changes' = 'false',
    'transforms'= 'unwrap,extractkey',
    'transforms.unwrap.type'= 'io.debezium.transforms.ExtractNewRecordState',
    'transforms.extractkey.type'= 'org.apache.kafka.connect.transforms.ExtractField$Key',
    'transforms.extractkey.field'= 'id',
    'key.converter'= 'org.apache.kafka.connect.converters.IntegerConverter',
    'value.converter'= 'io.confluent.connect.avro.AvroConverter',
    'value.converter.schema.registry.url'= 'http://schema-registry:8081'
);

并简单地创建您的流!

这个视频可以帮到你很多

https://www.youtube.com/watch?v=2fUOi9wJPhk&t=1550s

【讨论】:

以上是关于如何从kafka主题为ksqldb创建主题的主要内容,如果未能解决你的问题,请参考以下文章

Kafka Connect 重新读取整个文件以进行 KSQLDB 调试或 KSQLDB 是不是可以在创建查询后插入所有事件?

使用 Docker Compose 创建 Kafka-Connect 集群以供 ksqlDB 使用

在 Kafka ksqlDB 上连接两个表时出现错误“无效的连接条件:表-表连接需要在右输入表的主键上连接”

设置主题为经典后屏幕就变成黑色,如何恢复桌面

将 Kafka 连接嵌入 Ksqldb-server 时挂载(卷)不起作用

KSQLDB 连接拒绝 Kafka Connect