如何将 Debezium SQL Server 连接器与 ksqlDB 嵌入式连接一起使用?

Posted

技术标签:

【中文标题】如何将 Debezium SQL Server 连接器与 ksqlDB 嵌入式连接一起使用?【英文标题】:How to use the Debezium SQL Server connector with ksqlDB embedded Connect? 【发布时间】:2021-01-04 20:08:05 【问题描述】:

在尝试了一天的各种配置选项后,我无法让 Debezium SQL Server 连接器与 ksqlDB 嵌入式 Kafka Connect 一起工作。 ksqlDB 或 Debezium 网站上似乎没有关于如何设置的明确指导。

那里的文档表明,只需将相关的 Kafka Connect 插件安装到指定位置,一切都应该开箱即用地神奇地工作......但不幸的是,我没有这样的运气和错误消息我'我收到的信息没有给我任何反馈。

我的 docker-compose.yml 如下所示,我有 DEBEZIUM_VERSION=1.3 和我的 .env 文件中的 KSQLDB_VERSION=0.11.0:

version: '3.7'
services:
  # ***********************
  # * Kafka               *
  # ***********************
  zookeeper:
    image: debezium/zookeeper:$DEBEZIUM_VERSION
    networks:
      - infrastructure
    ports:
     - 2181:2181
     - 2888:2888
     - 3888:3888

  kafka:
    image: debezium/kafka:$DEBEZIUM_VERSION
    depends_on:
      - zookeeper
    networks:
      - infrastructure
    ports:
     - 9092:9092
     - 19092:19092
    environment:
     - BROKER_ID=1
     - ZOOKEEPER_CONNECT=zookeeper:2181
     - ALLOW_PLAINTEXT_LISTENER=yes

  schema-registry:
    image: confluentinc/cp-schema-registry
    depends_on:
      - zookeeper
      - kafka
    networks:
      - infrastructure
    ports:
     - 8181:8181
     - 8081:8081
    environment:
     - SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
     - SCHEMA_REGISTRY_HOST_NAME=schema-registry
     - SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081
     - SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_METHODS=GET,POST,PUT,OPTIONS
     - SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_ORIGIN=*

  schema-registry-ui:
    image: landoop/schema-registry-ui
    depends_on:
      - schema-registry
    networks:
      - infrastructure
    ports:
     - 8000:8000
    environment:
     - SCHEMAREGISTRY_URL=http://schema-registry:8081
     - PROXY=true

  ksqldb-server:
    image: confluentinc/ksqldb-server:$KSQLDB_VERSION
    build:
      context: ksqldb-sqlserver
      args:
        KSQLDB_VERSION: $KSQLDB_VERSION
    hostname: ksqldb-server
    container_name: ksqldb-server
    depends_on:
      - kafka
      - schema-registry
    networks:
      - infrastructure
    ports:
      - "8088:8088"
    #volumes:
    #  - "./confluent-hub-components/:/usr/share/kafka/plugins/"
    environment:
      KSQL_LISTENERS: "http://0.0.0.0:8088"
      KSQL_BOOTSTRAP_SERVERS: "kafka:9092"
      KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
      KSQL_CONNECT_GROUP_ID: "ksql-connect-cluster"
      KSQL_CONNECT_BOOTSTRAP_SERVERS: "kafka:9092"
      KSQL_CONNECT_KEY_CONVERTER: "io.confluent.connect.avro.AvroConverter"
      KSQL_CONNECT_VALUE_CONVERTER: "io.confluent.connect.avro.AvroConverter"
      KSQL_CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
      KSQL_CONNECT_CONFIG_STORAGE_TOPIC: "ksql-connect-configs"
      KSQL_CONNECT_OFFSET_STORAGE_TOPIC: "ksql-connect-offsets"
      KSQL_CONNECT_STATUS_STORAGE_TOPIC: "ksql-connect-statuses"
      KSQL_CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      KSQL_CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      KSQL_CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      KSQL_CONNECT_PLUGIN_PATH: "/usr/share/confluent-hub-components"

  # *-----------------------------*
    # To connect to the DB: 
    #   docker exec -it ksqldb-cli ksql http://ksqldb-server:8088
    # *-----------------------------*  
  ksqldb-cli:
    image: confluentinc/ksqldb-cli:$KSQLDB_VERSION
    container_name: ksqldb-cli
    depends_on:
      - kafka
      - ksqldb-server
    networks:
      - infrastructure
    entrypoint: /bin/sh
    tty: true

networks:
  infrastructure:
    name: infrastructure

我上面的 ksqldb-sqlserver 服务的 Dockerfile,基本上只是取默认镜像,然后将 Debezium SQL Server 连接器插件的内容复制到 /usr/share/confluent-hub-components,如下所示:

ARG KSQLDB_VERSION

FROM confluentinc/ksqldb-server:$KSQLDB_VERSION

ARG DEBEZIUM_VERSION=1.3.0.Beta2

ENV PLUGINS_DIR=/usr/share/confluent-hub-components/
ENV CONNECT_PLUGIN_PATH="/usr/share/confluent-hub-components"
USER root
RUN mkdir -p $PLUGINS_DIR && chown -R appuser: $PLUGINS_DIR
USER appuser
RUN cd $PLUGINS_DIR \
    && curl -sO https://repo1.maven.org/maven2/io/debezium/debezium-connector-sqlserver/$DEBEZIUM_VERSION/debezium-connector-sqlserver-$DEBEZIUM_VERSION-plugin.tar.gz \
    && tar -xzf debezium-connector-sqlserver-$DEBEZIUM_VERSION-plugin.tar.gz \
    && rm debezium-connector-sqlserver-$DEBEZIUM_VERSION-plugin.tar.gz

最后我在 ksqldb-cli 中创建连接器的语句如下:

CREATE SOURCE CONNECTOR accounts_reader WITH (
    'connector.class' = 'io.debezium.connector.sqlserver.SqlServerConnector',
    'database.hostname' = 'host.docker.internal',
    'database.port' = '1433',
    'database.user' = '******',
    'database.password' = ''******',
    'database.dbname' = ''******',
    'database.server.name' = ''******',
    'table.whitelist' = ''******',
    'database.history.kafka.bootstrap.servers' = 'kafka:9092',
    'database.history.kafka.topic' = ''******',
    'tasks.max' = '1'
);

ksqlDB CLI 返回“HTTP ERROR 500 Request failed”,在检查 ksqldb-server 的 Docker 日志时,我看到以下神秘消息(由于 *** 字符限制,我不得不稍微截断):

[2020-09-17 21:34:15,983] INFO Received: KsqlRequestksql='CREATE SOURCE CONNECTOR accounts_reader WITH (
    'connector.class' = 'io.debezium.connector.sqlserver.SqlServerConnector',
    'database.hostname' = 'host.docker.internal',
    'database.port' = '1433',
    'database.user' = '******',
    'database.password' = '******',
    'database.dbname' = '******',
    'database.server.name' = '******',
    'table.whitelist' = '******',
    'database.history.kafka.bootstrap.servers' = 'kafka:9092',
    'database.history.kafka.topic' = '******',
    'tasks.max' = '1'
);', configOverrides=, requestProperties=, commandSequenceNumber=Optional[-1] (io.confluent.ksql.rest.server.resources.KsqlResource:223)
[2020-09-17 21:34:16,156] WARN /connectors (org.eclipse.jetty.server.HttpChannel:600)
javax.servlet.ServletException: org.glassfish.jersey.server.ContainerException: java.lang.NoClassDefFoundError: io/debezium/DebeziumException
    at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:410)
    at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:346)
    at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:366)
    at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:319)
    at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:205)
    at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:763)
    at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:551)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
    at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1610)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
    at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1369)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188)
    at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:489)
    at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1580)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186)
    at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1284)
    at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
    at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:234)
    at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:173)
    at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
    at org.eclipse.jetty.server.Server.handle(Server.java:501)
    at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:383)
    at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:556)
    at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:375)
    at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:272)
    at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)
    at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103)
    at org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104)
    at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:806)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:938)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.glassfish.jersey.server.ContainerException: java.lang.NoClassDefFoundError: io/debezium/DebeziumException
    at org.glassfish.jersey.servlet.internal.ResponseWriter.rethrow(ResponseWriter.java:254)
    at org.glassfish.jersey.servlet.internal.ResponseWriter.failure(ResponseWriter.java:236)
    at org.glassfish.jersey.server.ServerRuntime$Responder.process(ServerRuntime.java:436)
    at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:261)
    at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248)
    at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:292)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:274)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:244)
    at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265)
    at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:232)
    at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:680)
    at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:394)
    ... 30 more
...
...
Caused by: org.glassfish.jersey.server.ContainerException: java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.sqlserver.SqlServerConnectorConfig
    at org.glassfish.jersey.servlet.internal.ResponseWriter.rethrow(ResponseWriter.java:254)
    at org.glassfish.jersey.servlet.internal.ResponseWriter.failure(ResponseWriter.java:236)
    at org.glassfish.jersey.server.ServerRuntime$Responder.process(ServerRuntime.java:436)
    at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:261)
    at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248)
    at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:292)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:274)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:244)
    at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265)
    at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:232)
    at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:680)
    at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:394)
    ... 34 more
Caused by: java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.sqlserver.SqlServerConnectorConfig
    at io.debezium.connector.sqlserver.SqlServerConnector.config(SqlServerConnector.java:57)
    at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:366)
    at org.apache.kafka.connect.runtime.AbstractHerder.lambda$validateConnectorConfig$1(AbstractHerder.java:326)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    ... 1 more
[2020-09-17 21:34:16,180] WARN unhandled due to prior sendError (org.eclipse.jetty.server.HttpChannelState:768)
javax.servlet.ServletException: org.glassfish.jersey.server.ContainerException: java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.sqlserver.SqlServerConnectorConfig
    at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:410)
    at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:346)
    at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:366)
    at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:319)
    at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:205)
    at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:763)
    at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:551)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
    at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1610)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
    at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1369)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188)
    at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:489)
    at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1580)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186)
    at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1284)
    at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
    at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:234)
    at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:173)
    at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
    at org.eclipse.jetty.server.Server.handle(Server.java:501)
    at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:383)
    at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:556)
    at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:375)
    at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:272)
    at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)
    at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103)
    at org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:336)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:313)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:171)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.produce(EatWhatYouKill.java:135)
    at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:806)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:938)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.glassfish.jersey.server.ContainerException: java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.sqlserver.SqlServerConnectorConfig
    at org.glassfish.jersey.servlet.internal.ResponseWriter.rethrow(ResponseWriter.java:254)
    at org.glassfish.jersey.servlet.internal.ResponseWriter.failure(ResponseWriter.java:236)
    at org.glassfish.jersey.server.ServerRuntime$Responder.process(ServerRuntime.java:436)
    at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:261)
    at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248)
    at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:292)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:274)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:244)
    at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265)
    at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:232)
    at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:680)
    at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:394)
    ... 34 more
Caused by: java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.sqlserver.SqlServerConnectorConfig
    at io.debezium.connector.sqlserver.SqlServerConnector.config(SqlServerConnector.java:57)
    at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:366)
    at org.apache.kafka.connect.runtime.AbstractHerder.lambda$validateConnectorConfig$1(AbstractHerder.java:326)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    ... 1 more
[2020-09-17 21:34:16,191] WARN /connectors (org.eclipse.jetty.server.HttpChannel:600)
javax.servlet.ServletException: org.glassfish.jersey.server.ContainerException: java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.sqlserver.SqlServerConnectorConfig
    at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:410)
    at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:346)
    at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:366)
    at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:319)
    at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:205)
    at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:763)
    at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:551)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
    at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1610)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
    at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1369)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188)
    at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:489)
    at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1580)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186)
    at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1284)
    at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
    at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:234)
    at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:173)
    at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
    at org.eclipse.jetty.server.Server.handle(Server.java:501)
    at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:383)
    at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:556)
    at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:375)
    at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:272)
    at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)
    at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103)
    at org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:336)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:313)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:171)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.produce(EatWhatYouKill.java:135)
    at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:806)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:938)
    at java.base/java.lang.Thread.run(Thread.java:834)
...
...
[2020-09-17 21:34:16,192] WARN unhandled due to prior sendError (org.eclipse.jetty.server.HttpChannelState:768)
javax.servlet.ServletException: org.glassfish.jersey.server.ContainerException: java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.sqlserver.SqlServerConnectorConfig
    at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:410)
    at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:346)
    at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:366)
    at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:319)
    at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:205)
    at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:763)
    at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:551)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
    at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1610)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
    at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1369)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188)
    at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:489)
    at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1580)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186)
    at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1284)
    at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
    at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:234)
    at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:173)
    at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
    at org.eclipse.jetty.server.Server.handle(Server.java:501)
    at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:383)
    at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:556)
    at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:375)
    at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:272)
    at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)
    at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103)
    at org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:336)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:313)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:171)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.produce(EatWhatYouKill.java:135)
    at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:806)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:938)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.glassfish.jersey.server.ContainerException: java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.sqlserver.SqlServerConnectorConfig
    at org.glassfish.jersey.servlet.internal.ResponseWriter.rethrow(ResponseWriter.java:254)
    at org.glassfish.jersey.servlet.internal.ResponseWriter.failure(ResponseWriter.java:236)
    at org.glassfish.jersey.server.ServerRuntime$Responder.process(ServerRuntime.java:436)
    at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:261)
    at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248)
    at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:292)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:274)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:244)
    at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265)
    at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:232)
    at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:680)
    at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:394)
    ... 34 more
Caused by: java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.sqlserver.SqlServerConnectorConfig
    at io.debezium.connector.sqlserver.SqlServerConnector.config(SqlServerConnector.java:57)
    at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:366)
    at org.apache.kafka.connect.runtime.AbstractHerder.lambda$validateConnectorConfig$1(AbstractHerder.java:326)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    ... 1 more
[2020-09-17 21:34:16,194] WARN Failed to query connect cluster after 3 attempts. (io.confluent.ksql.services.DefaultConnectClient:278)
[2020-09-17 21:34:16,195] WARN Did not CREATE connector ACCOUNTS_READER: <html>
<head>
<meta http-equiv="Content-Type" content="text/html;charset=utf-8"/>
<title>Error 500 Request failed.</title>
</head>
<body><h2>HTTP ERROR 500 Request failed.</h2>
<table>
<tr><th>URI:</th><td>/connectors</td></tr>
<tr><th>STATUS:</th><td>500</td></tr>
<tr><th>MESSAGE:</th><td>Request failed.</td></tr>
<tr><th>SERVLET:</th><td>org.glassfish.jersey.servlet.ServletContainer-7c956dda</td></tr>
</table>
<hr><a href="http://eclipse.org/jetty">Powered by Jetty:// 9.4.30.v20200611</a><hr/>

</body>
</html>
 (io.confluent.ksql.services.DefaultConnectClient:115)
[2020-09-17 21:34:16,196] INFO Processed successfully: KsqlRequestksql='CREATE SOURCE CONNECTOR accounts_reader WITH (
    'connector.class' = 'io.debezium.connector.sqlserver.SqlServerConnector',
    'database.hostname' = 'host.docker.internal',
    'database.port' = '1433',
    'database.user' = '******',
    'database.password' = '******',
    'database.dbname' = '******',
    'database.server.name' = '******',
    'table.whitelist' = '******',
    'database.history.kafka.bootstrap.servers' = 'kafka:9092',
    'database.history.kafka.topic' = '******',
    'tasks.max' = '1'
);', configOverrides=, requestProperties=, commandSequenceNumber=Optional[-1] (io.confluent.ksql.rest.server.resources.KsqlResource:261)

我不是 100% 确定,但我不认为这意味着缺少 SqlServerConnectorConfig 类,而是初始化时存在基本错误(可能比连接器配置中的一些错误更基本)。

有没有人能指出我哪里出错了,或者,一种在 ksqlDB 嵌入式连接中设置 Debezium 源连接器和 JDBC 接收器连接器的简单方法?

【问题讨论】:

【参考方案1】:

这是一个为您工作的 Docker Compose。如果您愿意,您可以从中提炼出如何构建自己的 Docker 映像

---
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.5.1
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:5.5.1
    container_name: broker
    depends_on:
      - zookeeper
    ports:
    # "`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-
    # An important note about accessing Kafka from clients on other machines:
    # -----------------------------------------------------------------------
    #
    # The config used here exposes port 9092 for _external_ connections to the broker
    # i.e. those from _outside_ the docker network. This could be from the host machine
    # running docker, or maybe further afield if you've got a more complicated setup.
    # If the latter is true, you will need to change the value 'localhost' in
    # KAFKA_ADVERTISED_LISTENERS to one that is resolvable to the docker host from those
    # remote clients
    #
    # For connections _internal_ to the docker network, such as from other services
    # and components, use kafka:29092.
    #
    # See https://rmoff.net/2018/08/02/kafka-listeners-explained/ for details
    # "`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-
    #
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100

  schema-registry:
    image: confluentinc/cp-schema-registry:5.5.1
    container_name: schema-registry
    ports:
      - "8081:8081"
    depends_on:
      - zookeeper
      - broker
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:2181

  ksqldb:
    # *-----------------------------*
    # To connect to ksqlDB CLI
    #   docker exec --interactive --tty ksqldb ksql http://localhost:8088
    # *-----------------------------*
    image: confluentinc/ksqldb-server:0.11.0
    container_name: ksqldb
    depends_on:
      - broker
    ports:
      - "8088:8088"
      - "8083:8083"
    user: root
    environment:
      KSQL_LISTENERS: http://0.0.0.0:8088
      KSQL_BOOTSTRAP_SERVERS: broker:29092
      KSQL_KSQL_SERVICE_ID: confluent_rmoff_01
      KSQL_KSQL_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      KSQL_KSQL_HIDDEN_TOPICS: '^_.*'
      # Setting KSQL_KSQL_CONNECT_WORKER_CONFIG enables embedded Kafka Connect
      KSQL_KSQL_CONNECT_WORKER_CONFIG: "/etc/ksqldb/connect.properties"
      # Kafka Connect config below
      KSQL_CONNECT_BOOTSTRAP_SERVERS: "broker:29092"
      KSQL_CONNECT_REST_ADVERTISED_HOST_NAME: 'ksqldb'
      KSQL_CONNECT_REST_PORT: 8083
      KSQL_CONNECT_GROUP_ID: ksqldb-kafka-connect-group-01
      KSQL_CONNECT_CONFIG_STORAGE_TOPIC: _ksqldb-kafka-connect-group-01-configs
      KSQL_CONNECT_OFFSET_STORAGE_TOPIC: _ksqldb-kafka-connect-group-01-offsets
      KSQL_CONNECT_STATUS_STORAGE_TOPIC: _ksqldb-kafka-connect-group-01-status
      KSQL_CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      KSQL_CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      KSQL_CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
      KSQL_CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '1'
      KSQL_CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '1'
      KSQL_CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '1'
      KSQL_CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %Xconnector.context%m (%c:%L)%n"
      KSQL_CONNECT_PLUGIN_PATH: '/usr/share/java'

    command:
      # In the command section, $ are replaced with $$ to avoid the error 'Invalid interpolation format for "command" option'
      - bash
      - -c
      - |
        echo "Installing connector plugins"
        # ------ hack to workaround absence of confluent-hub client
        # mkdir -p /usr/share/confluent-hub-components/
        # confluent-hub install --no-prompt --component-dir /usr/share/confluent-hub-components/ debezium/debezium-connector-sqlserver:1.2.2
        curl https://d1i4a15mxbxib1.cloudfront.net/api/plugins/debezium/debezium-connector-sqlserver/versions/1.2.2/debezium-debezium-connector-sqlserver-1.2.2.zip -o /tmp/kafka-connect-mssql.zip
        yum install -y unzip
        unzip /tmp/kafka-connect-mssql.zip -d /usr/share/java/
        # ----------------------------------------------------------
        #
        echo "Launching ksqlDB"
        /usr/bin/docker/run &

        sleep infinity

您可以找到 Docker Compose 的完整副本,包括为 CDC here 配置的示例 MS SQL 容器

有关完整的演练和工作示例,请参阅此博客:https://rmoff.net/2020/09/18/using-the-debezium-ms-sql-connector-with-ksqldb-embedded-kafka-connect/

【讨论】:

感谢您,非常感谢 - 也是一篇非常有用的文章!最终,我希望使用带有嵌入式 Connect 的 ksqlDB 服务器集群来完全替换独立的 Connect 实例,主要是为了减少移动部件的数量。这样做是否有很多开销(因为我们最初将执行的计算量非常低)? 澄清一下,“开销”是指大量额外的 RAM/资源使用(即使连接和聚合等高级 ksqlDB 功能没有被大量使用)或任何会使维护最好使用单独的 Connect 集群。 嵌入式连接肯定是有意义的,直到(或如果)你需要扩展你的连接或你的 ksqlDB 并且不能独立地这样做。有关此问题的更多讨论,请前往cnfl.io/slack 并加入#ksqldb 频道。 谢谢 - 会的!

以上是关于如何将 Debezium SQL Server 连接器与 ksqlDB 嵌入式连接一起使用?的主要内容,如果未能解决你的问题,请参考以下文章

如何将 Debezium SQL Server 连接器与 ksqlDB 嵌入式连接一起使用?

SQL Server 的 Debezium 连接器,并希望在 debezium 层进行过滤

如何在不使用 Docker 或 Windows Server 2016 上的 Confluent 平台的情况下在 Kafka 中设置 Debezium SQL Server 连接器?

如何在 Docker 中将 Debezium 连接到 MongoDB?

安装 Debezium SQL Server CDC 源连接器

成功创建 Always On SQL Server 快照后,Debezium 未跟踪 CDC