如何将 Debezium SQL Server 连接器与 ksqlDB 嵌入式连接一起使用?
Posted
技术标签:
【中文标题】如何将 Debezium SQL Server 连接器与 ksqlDB 嵌入式连接一起使用?【英文标题】:How to use the Debezium SQL Server connector with ksqlDB embedded Connect? 【发布时间】:2021-01-04 20:08:05 【问题描述】:在尝试了一天的各种配置选项后,我无法让 Debezium SQL Server 连接器与 ksqlDB 嵌入式 Kafka Connect 一起工作。 ksqlDB 或 Debezium 网站上似乎没有关于如何设置的明确指导。
那里的文档表明,只需将相关的 Kafka Connect 插件安装到指定位置,一切都应该开箱即用地神奇地工作......但不幸的是,我没有这样的运气和错误消息我'我收到的信息没有给我任何反馈。
我的 docker-compose.yml 如下所示,我有 DEBEZIUM_VERSION=1.3 和我的 .env 文件中的 KSQLDB_VERSION=0.11.0:
version: '3.7'
services:
# ***********************
# * Kafka *
# ***********************
zookeeper:
image: debezium/zookeeper:$DEBEZIUM_VERSION
networks:
- infrastructure
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
image: debezium/kafka:$DEBEZIUM_VERSION
depends_on:
- zookeeper
networks:
- infrastructure
ports:
- 9092:9092
- 19092:19092
environment:
- BROKER_ID=1
- ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
schema-registry:
image: confluentinc/cp-schema-registry
depends_on:
- zookeeper
- kafka
networks:
- infrastructure
ports:
- 8181:8181
- 8081:8081
environment:
- SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
- SCHEMA_REGISTRY_HOST_NAME=schema-registry
- SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081
- SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_METHODS=GET,POST,PUT,OPTIONS
- SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_ORIGIN=*
schema-registry-ui:
image: landoop/schema-registry-ui
depends_on:
- schema-registry
networks:
- infrastructure
ports:
- 8000:8000
environment:
- SCHEMAREGISTRY_URL=http://schema-registry:8081
- PROXY=true
ksqldb-server:
image: confluentinc/ksqldb-server:$KSQLDB_VERSION
build:
context: ksqldb-sqlserver
args:
KSQLDB_VERSION: $KSQLDB_VERSION
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- kafka
- schema-registry
networks:
- infrastructure
ports:
- "8088:8088"
#volumes:
# - "./confluent-hub-components/:/usr/share/kafka/plugins/"
environment:
KSQL_LISTENERS: "http://0.0.0.0:8088"
KSQL_BOOTSTRAP_SERVERS: "kafka:9092"
KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
KSQL_CONNECT_GROUP_ID: "ksql-connect-cluster"
KSQL_CONNECT_BOOTSTRAP_SERVERS: "kafka:9092"
KSQL_CONNECT_KEY_CONVERTER: "io.confluent.connect.avro.AvroConverter"
KSQL_CONNECT_VALUE_CONVERTER: "io.confluent.connect.avro.AvroConverter"
KSQL_CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
KSQL_CONNECT_CONFIG_STORAGE_TOPIC: "ksql-connect-configs"
KSQL_CONNECT_OFFSET_STORAGE_TOPIC: "ksql-connect-offsets"
KSQL_CONNECT_STATUS_STORAGE_TOPIC: "ksql-connect-statuses"
KSQL_CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
KSQL_CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
KSQL_CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
KSQL_CONNECT_PLUGIN_PATH: "/usr/share/confluent-hub-components"
# *-----------------------------*
# To connect to the DB:
# docker exec -it ksqldb-cli ksql http://ksqldb-server:8088
# *-----------------------------*
ksqldb-cli:
image: confluentinc/ksqldb-cli:$KSQLDB_VERSION
container_name: ksqldb-cli
depends_on:
- kafka
- ksqldb-server
networks:
- infrastructure
entrypoint: /bin/sh
tty: true
networks:
infrastructure:
name: infrastructure
我上面的 ksqldb-sqlserver 服务的 Dockerfile,基本上只是取默认镜像,然后将 Debezium SQL Server 连接器插件的内容复制到 /usr/share/confluent-hub-components,如下所示:
ARG KSQLDB_VERSION
FROM confluentinc/ksqldb-server:$KSQLDB_VERSION
ARG DEBEZIUM_VERSION=1.3.0.Beta2
ENV PLUGINS_DIR=/usr/share/confluent-hub-components/
ENV CONNECT_PLUGIN_PATH="/usr/share/confluent-hub-components"
USER root
RUN mkdir -p $PLUGINS_DIR && chown -R appuser: $PLUGINS_DIR
USER appuser
RUN cd $PLUGINS_DIR \
&& curl -sO https://repo1.maven.org/maven2/io/debezium/debezium-connector-sqlserver/$DEBEZIUM_VERSION/debezium-connector-sqlserver-$DEBEZIUM_VERSION-plugin.tar.gz \
&& tar -xzf debezium-connector-sqlserver-$DEBEZIUM_VERSION-plugin.tar.gz \
&& rm debezium-connector-sqlserver-$DEBEZIUM_VERSION-plugin.tar.gz
最后我在 ksqldb-cli 中创建连接器的语句如下:
CREATE SOURCE CONNECTOR accounts_reader WITH (
'connector.class' = 'io.debezium.connector.sqlserver.SqlServerConnector',
'database.hostname' = 'host.docker.internal',
'database.port' = '1433',
'database.user' = '******',
'database.password' = ''******',
'database.dbname' = ''******',
'database.server.name' = ''******',
'table.whitelist' = ''******',
'database.history.kafka.bootstrap.servers' = 'kafka:9092',
'database.history.kafka.topic' = ''******',
'tasks.max' = '1'
);
ksqlDB CLI 返回“HTTP ERROR 500 Request failed”,在检查 ksqldb-server 的 Docker 日志时,我看到以下神秘消息(由于 *** 字符限制,我不得不稍微截断):
[2020-09-17 21:34:15,983] INFO Received: KsqlRequestksql='CREATE SOURCE CONNECTOR accounts_reader WITH (
'connector.class' = 'io.debezium.connector.sqlserver.SqlServerConnector',
'database.hostname' = 'host.docker.internal',
'database.port' = '1433',
'database.user' = '******',
'database.password' = '******',
'database.dbname' = '******',
'database.server.name' = '******',
'table.whitelist' = '******',
'database.history.kafka.bootstrap.servers' = 'kafka:9092',
'database.history.kafka.topic' = '******',
'tasks.max' = '1'
);', configOverrides=, requestProperties=, commandSequenceNumber=Optional[-1] (io.confluent.ksql.rest.server.resources.KsqlResource:223)
[2020-09-17 21:34:16,156] WARN /connectors (org.eclipse.jetty.server.HttpChannel:600)
javax.servlet.ServletException: org.glassfish.jersey.server.ContainerException: java.lang.NoClassDefFoundError: io/debezium/DebeziumException
at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:410)
at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:346)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:366)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:319)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:205)
at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:763)
at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:551)
at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1610)
at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1369)
at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188)
at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:489)
at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1580)
at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186)
at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1284)
at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:234)
at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:173)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
at org.eclipse.jetty.server.Server.handle(Server.java:501)
at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:383)
at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:556)
at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:375)
at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:272)
at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)
at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103)
at org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104)
at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:806)
at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:938)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.glassfish.jersey.server.ContainerException: java.lang.NoClassDefFoundError: io/debezium/DebeziumException
at org.glassfish.jersey.servlet.internal.ResponseWriter.rethrow(ResponseWriter.java:254)
at org.glassfish.jersey.servlet.internal.ResponseWriter.failure(ResponseWriter.java:236)
at org.glassfish.jersey.server.ServerRuntime$Responder.process(ServerRuntime.java:436)
at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:261)
at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248)
at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244)
at org.glassfish.jersey.internal.Errors.process(Errors.java:292)
at org.glassfish.jersey.internal.Errors.process(Errors.java:274)
at org.glassfish.jersey.internal.Errors.process(Errors.java:244)
at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265)
at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:232)
at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:680)
at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:394)
... 30 more
...
...
Caused by: org.glassfish.jersey.server.ContainerException: java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.sqlserver.SqlServerConnectorConfig
at org.glassfish.jersey.servlet.internal.ResponseWriter.rethrow(ResponseWriter.java:254)
at org.glassfish.jersey.servlet.internal.ResponseWriter.failure(ResponseWriter.java:236)
at org.glassfish.jersey.server.ServerRuntime$Responder.process(ServerRuntime.java:436)
at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:261)
at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248)
at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244)
at org.glassfish.jersey.internal.Errors.process(Errors.java:292)
at org.glassfish.jersey.internal.Errors.process(Errors.java:274)
at org.glassfish.jersey.internal.Errors.process(Errors.java:244)
at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265)
at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:232)
at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:680)
at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:394)
... 34 more
Caused by: java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.sqlserver.SqlServerConnectorConfig
at io.debezium.connector.sqlserver.SqlServerConnector.config(SqlServerConnector.java:57)
at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:366)
at org.apache.kafka.connect.runtime.AbstractHerder.lambda$validateConnectorConfig$1(AbstractHerder.java:326)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
... 1 more
[2020-09-17 21:34:16,180] WARN unhandled due to prior sendError (org.eclipse.jetty.server.HttpChannelState:768)
javax.servlet.ServletException: org.glassfish.jersey.server.ContainerException: java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.sqlserver.SqlServerConnectorConfig
at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:410)
at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:346)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:366)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:319)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:205)
at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:763)
at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:551)
at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1610)
at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1369)
at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188)
at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:489)
at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1580)
at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186)
at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1284)
at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:234)
at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:173)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
at org.eclipse.jetty.server.Server.handle(Server.java:501)
at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:383)
at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:556)
at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:375)
at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:272)
at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)
at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103)
at org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:336)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:313)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:171)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.produce(EatWhatYouKill.java:135)
at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:806)
at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:938)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.glassfish.jersey.server.ContainerException: java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.sqlserver.SqlServerConnectorConfig
at org.glassfish.jersey.servlet.internal.ResponseWriter.rethrow(ResponseWriter.java:254)
at org.glassfish.jersey.servlet.internal.ResponseWriter.failure(ResponseWriter.java:236)
at org.glassfish.jersey.server.ServerRuntime$Responder.process(ServerRuntime.java:436)
at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:261)
at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248)
at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244)
at org.glassfish.jersey.internal.Errors.process(Errors.java:292)
at org.glassfish.jersey.internal.Errors.process(Errors.java:274)
at org.glassfish.jersey.internal.Errors.process(Errors.java:244)
at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265)
at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:232)
at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:680)
at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:394)
... 34 more
Caused by: java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.sqlserver.SqlServerConnectorConfig
at io.debezium.connector.sqlserver.SqlServerConnector.config(SqlServerConnector.java:57)
at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:366)
at org.apache.kafka.connect.runtime.AbstractHerder.lambda$validateConnectorConfig$1(AbstractHerder.java:326)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
... 1 more
[2020-09-17 21:34:16,191] WARN /connectors (org.eclipse.jetty.server.HttpChannel:600)
javax.servlet.ServletException: org.glassfish.jersey.server.ContainerException: java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.sqlserver.SqlServerConnectorConfig
at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:410)
at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:346)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:366)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:319)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:205)
at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:763)
at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:551)
at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1610)
at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1369)
at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188)
at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:489)
at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1580)
at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186)
at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1284)
at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:234)
at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:173)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
at org.eclipse.jetty.server.Server.handle(Server.java:501)
at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:383)
at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:556)
at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:375)
at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:272)
at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)
at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103)
at org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:336)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:313)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:171)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.produce(EatWhatYouKill.java:135)
at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:806)
at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:938)
at java.base/java.lang.Thread.run(Thread.java:834)
...
...
[2020-09-17 21:34:16,192] WARN unhandled due to prior sendError (org.eclipse.jetty.server.HttpChannelState:768)
javax.servlet.ServletException: org.glassfish.jersey.server.ContainerException: java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.sqlserver.SqlServerConnectorConfig
at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:410)
at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:346)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:366)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:319)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:205)
at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:763)
at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:551)
at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1610)
at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1369)
at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188)
at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:489)
at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1580)
at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186)
at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1284)
at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:234)
at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:173)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
at org.eclipse.jetty.server.Server.handle(Server.java:501)
at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:383)
at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:556)
at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:375)
at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:272)
at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)
at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103)
at org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:336)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:313)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:171)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.produce(EatWhatYouKill.java:135)
at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:806)
at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:938)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.glassfish.jersey.server.ContainerException: java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.sqlserver.SqlServerConnectorConfig
at org.glassfish.jersey.servlet.internal.ResponseWriter.rethrow(ResponseWriter.java:254)
at org.glassfish.jersey.servlet.internal.ResponseWriter.failure(ResponseWriter.java:236)
at org.glassfish.jersey.server.ServerRuntime$Responder.process(ServerRuntime.java:436)
at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:261)
at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248)
at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244)
at org.glassfish.jersey.internal.Errors.process(Errors.java:292)
at org.glassfish.jersey.internal.Errors.process(Errors.java:274)
at org.glassfish.jersey.internal.Errors.process(Errors.java:244)
at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265)
at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:232)
at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:680)
at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:394)
... 34 more
Caused by: java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.sqlserver.SqlServerConnectorConfig
at io.debezium.connector.sqlserver.SqlServerConnector.config(SqlServerConnector.java:57)
at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:366)
at org.apache.kafka.connect.runtime.AbstractHerder.lambda$validateConnectorConfig$1(AbstractHerder.java:326)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
... 1 more
[2020-09-17 21:34:16,194] WARN Failed to query connect cluster after 3 attempts. (io.confluent.ksql.services.DefaultConnectClient:278)
[2020-09-17 21:34:16,195] WARN Did not CREATE connector ACCOUNTS_READER: <html>
<head>
<meta http-equiv="Content-Type" content="text/html;charset=utf-8"/>
<title>Error 500 Request failed.</title>
</head>
<body><h2>HTTP ERROR 500 Request failed.</h2>
<table>
<tr><th>URI:</th><td>/connectors</td></tr>
<tr><th>STATUS:</th><td>500</td></tr>
<tr><th>MESSAGE:</th><td>Request failed.</td></tr>
<tr><th>SERVLET:</th><td>org.glassfish.jersey.servlet.ServletContainer-7c956dda</td></tr>
</table>
<hr><a href="http://eclipse.org/jetty">Powered by Jetty:// 9.4.30.v20200611</a><hr/>
</body>
</html>
(io.confluent.ksql.services.DefaultConnectClient:115)
[2020-09-17 21:34:16,196] INFO Processed successfully: KsqlRequestksql='CREATE SOURCE CONNECTOR accounts_reader WITH (
'connector.class' = 'io.debezium.connector.sqlserver.SqlServerConnector',
'database.hostname' = 'host.docker.internal',
'database.port' = '1433',
'database.user' = '******',
'database.password' = '******',
'database.dbname' = '******',
'database.server.name' = '******',
'table.whitelist' = '******',
'database.history.kafka.bootstrap.servers' = 'kafka:9092',
'database.history.kafka.topic' = '******',
'tasks.max' = '1'
);', configOverrides=, requestProperties=, commandSequenceNumber=Optional[-1] (io.confluent.ksql.rest.server.resources.KsqlResource:261)
我不是 100% 确定,但我不认为这意味着缺少 SqlServerConnectorConfig 类,而是初始化时存在基本错误(可能比连接器配置中的一些错误更基本)。
有没有人能指出我哪里出错了,或者,一种在 ksqlDB 嵌入式连接中设置 Debezium 源连接器和 JDBC 接收器连接器的简单方法?
【问题讨论】:
【参考方案1】:这是一个为您工作的 Docker Compose。如果您愿意,您可以从中提炼出如何构建自己的 Docker 映像
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.5.1
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:5.5.1
container_name: broker
depends_on:
- zookeeper
ports:
# "`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-
# An important note about accessing Kafka from clients on other machines:
# -----------------------------------------------------------------------
#
# The config used here exposes port 9092 for _external_ connections to the broker
# i.e. those from _outside_ the docker network. This could be from the host machine
# running docker, or maybe further afield if you've got a more complicated setup.
# If the latter is true, you will need to change the value 'localhost' in
# KAFKA_ADVERTISED_LISTENERS to one that is resolvable to the docker host from those
# remote clients
#
# For connections _internal_ to the docker network, such as from other services
# and components, use kafka:29092.
#
# See https://rmoff.net/2018/08/02/kafka-listeners-explained/ for details
# "`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-
#
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100
schema-registry:
image: confluentinc/cp-schema-registry:5.5.1
container_name: schema-registry
ports:
- "8081:8081"
depends_on:
- zookeeper
- broker
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:2181
ksqldb:
# *-----------------------------*
# To connect to ksqlDB CLI
# docker exec --interactive --tty ksqldb ksql http://localhost:8088
# *-----------------------------*
image: confluentinc/ksqldb-server:0.11.0
container_name: ksqldb
depends_on:
- broker
ports:
- "8088:8088"
- "8083:8083"
user: root
environment:
KSQL_LISTENERS: http://0.0.0.0:8088
KSQL_BOOTSTRAP_SERVERS: broker:29092
KSQL_KSQL_SERVICE_ID: confluent_rmoff_01
KSQL_KSQL_SCHEMA_REGISTRY_URL: http://schema-registry:8081
KSQL_KSQL_HIDDEN_TOPICS: '^_.*'
# Setting KSQL_KSQL_CONNECT_WORKER_CONFIG enables embedded Kafka Connect
KSQL_KSQL_CONNECT_WORKER_CONFIG: "/etc/ksqldb/connect.properties"
# Kafka Connect config below
KSQL_CONNECT_BOOTSTRAP_SERVERS: "broker:29092"
KSQL_CONNECT_REST_ADVERTISED_HOST_NAME: 'ksqldb'
KSQL_CONNECT_REST_PORT: 8083
KSQL_CONNECT_GROUP_ID: ksqldb-kafka-connect-group-01
KSQL_CONNECT_CONFIG_STORAGE_TOPIC: _ksqldb-kafka-connect-group-01-configs
KSQL_CONNECT_OFFSET_STORAGE_TOPIC: _ksqldb-kafka-connect-group-01-offsets
KSQL_CONNECT_STATUS_STORAGE_TOPIC: _ksqldb-kafka-connect-group-01-status
KSQL_CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
KSQL_CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
KSQL_CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
KSQL_CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '1'
KSQL_CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '1'
KSQL_CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '1'
KSQL_CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %Xconnector.context%m (%c:%L)%n"
KSQL_CONNECT_PLUGIN_PATH: '/usr/share/java'
command:
# In the command section, $ are replaced with $$ to avoid the error 'Invalid interpolation format for "command" option'
- bash
- -c
- |
echo "Installing connector plugins"
# ------ hack to workaround absence of confluent-hub client
# mkdir -p /usr/share/confluent-hub-components/
# confluent-hub install --no-prompt --component-dir /usr/share/confluent-hub-components/ debezium/debezium-connector-sqlserver:1.2.2
curl https://d1i4a15mxbxib1.cloudfront.net/api/plugins/debezium/debezium-connector-sqlserver/versions/1.2.2/debezium-debezium-connector-sqlserver-1.2.2.zip -o /tmp/kafka-connect-mssql.zip
yum install -y unzip
unzip /tmp/kafka-connect-mssql.zip -d /usr/share/java/
# ----------------------------------------------------------
#
echo "Launching ksqlDB"
/usr/bin/docker/run &
sleep infinity
您可以找到 Docker Compose 的完整副本,包括为 CDC here 配置的示例 MS SQL 容器
有关完整的演练和工作示例,请参阅此博客:https://rmoff.net/2020/09/18/using-the-debezium-ms-sql-connector-with-ksqldb-embedded-kafka-connect/
【讨论】:
感谢您,非常感谢 - 也是一篇非常有用的文章!最终,我希望使用带有嵌入式 Connect 的 ksqlDB 服务器集群来完全替换独立的 Connect 实例,主要是为了减少移动部件的数量。这样做是否有很多开销(因为我们最初将执行的计算量非常低)? 澄清一下,“开销”是指大量额外的 RAM/资源使用(即使连接和聚合等高级 ksqlDB 功能没有被大量使用)或任何会使维护最好使用单独的 Connect 集群。 嵌入式连接肯定是有意义的,直到(或如果)你需要扩展你的连接或你的 ksqlDB 并且不能独立地这样做。有关此问题的更多讨论,请前往cnfl.io/slack 并加入#ksqldb 频道。 谢谢 - 会的!以上是关于如何将 Debezium SQL Server 连接器与 ksqlDB 嵌入式连接一起使用?的主要内容,如果未能解决你的问题,请参考以下文章
如何将 Debezium SQL Server 连接器与 ksqlDB 嵌入式连接一起使用?
SQL Server 的 Debezium 连接器,并希望在 debezium 层进行过滤
如何在不使用 Docker 或 Windows Server 2016 上的 Confluent 平台的情况下在 Kafka 中设置 Debezium SQL Server 连接器?
如何在 Docker 中将 Debezium 连接到 MongoDB?