无法在启用 SSL 的 Kafka 集群中注册 Debezium (Kafka-Connect) 连接器
Posted
技术标签:
【中文标题】无法在启用 SSL 的 Kafka 集群中注册 Debezium (Kafka-Connect) 连接器【英文标题】:Unable to register Debezium (Kafka-Connect) connector in SSL enabled Kafka cluster 【发布时间】:2019-11-27 10:09:57 【问题描述】:我正在尝试在启用 SSL 的 Kafka 集群中注册 mysql Debezium 连接器。我为此目的使用的卷曲是:
curl -k -X POST -H "Accept:application/json" -H "Content-Type:application/json" https://<IPADDRESS>:8083/connectors/ -d ' "name": "test-eds-extactor-profile", "config": "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "<DBHOSTNAME>", "database.port": "3306", "database.user": "debezium", "database.password": "*****", "database.server.id": "1", "database.server.name": "MySQL-Database-Docker", "database.history.kafka.bootstrap.servers": "<IPADDRESS>:9094", "database.history.kafka.topic": "dbhistory.profile" , "include.schema.changes": "true", "table.whitelist": "test_eds_extraction_src_db_mock.profile", "database.history.producer.security.protocol": "SASL_PLAINTEXT", "database.history.producer.ssl.keystore.location": "path/to/server.jks", "database.history.producer.ssl.keystore.password": "******", "database.history.producer.ssl.truststore.location": "path/to//server.jks", "database.history.producer.ssl.truststore.password": "******", "database.history.producer.ssl.key.password": "******", "database.history.consumer.security.protocol": "SASL_PLAINTEXT", "database.history.consumer.ssl.keystore.location": "path/to/server.jks", "database.history.consumer.ssl.keystore.password": "******", "database.history.consumer.ssl.truststore.location": "path/to/server.jks", "database.history.consumer.ssl.truststore.password": "******", "database.history.consumer.ssl.key.password": "******" '
Debezium 无法创建 database.history 主题,失败并出现以下错误:
"name":"test-eds-extactor-profile","connector":"state":"RUNNING","worker_id":"<IPADDRESS>:8083","tasks":["id":0,"state":"FAILED","worker_id":"<IPADDRESS>:8083","trace":"org.apache.kafka.connect.errors.ConnectException: org.apache.kafka.common.KafkaException: Failed to construct kafka producer\n\tat io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:273)\n\tat io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:47)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:198)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.common.KafkaException: Failed to construct kafka producer\n\tat org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:429)\n\tat org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:297)\n\tat io.debezium.relational.history.KafkaDatabaseHistory.start(KafkaDatabaseHistory.java:171)\n\tat io.debezium.connector.mysql.MySqlSchema.start(MySqlSchema.java:161)\n\tat io.debezium.connector.mysql.MySqlTaskContext.start(MySqlTaskContext.java:255)\n\tat io.debezium.connector.mysql.MySqlConnectorTask.createAndStartTaskContext(MySqlConnectorTask.java:330)\n\tat io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:136)\n\t... 9 more\nCaused by: org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config\n\tat org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:153)\n\tat org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:140)\n\tat org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:65)\n\tat org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:108)\n\tat org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:437)\n\tat org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:418)\n\t... 15 more\nCaused by: java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config\n\tat org.apache.kafka.common.security.kerberos.KerberosLogin.getServiceName(KerberosLogin.java:301)\n\tat org.apache.kafka.common.security.kerberos.KerberosLogin.configure(KerberosLogin.java:92)\n\tat org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:60)\n\tat org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:111)\n\tat org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:142)\n\t... 20 more\n"],"type":"source"
美化错误:
Failed to construct kafka producer
at io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:273)
at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:47)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:198)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:429)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:297)
at io.debezium.relational.history.KafkaDatabaseHistory.start(KafkaDatabaseHistory.java:171)
at io.debezium.connector.mysql.MySqlSchema.start(MySqlSchema.java:161)
at io.debezium.connector.mysql.MySqlTaskContext.start(MySqlTaskContext.java:255)
at io.debezium.connector.mysql.MySqlConnectorTask.createAndStartTaskContext(MySqlConnectorTask.java:330)
at io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:136)
... 9 more
Caused by: org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:153)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:140)
at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:65)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:108)
at org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:437)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:418)
... 15 more
Caused by: java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config
at org.apache.kafka.common.security.kerberos.KerberosLogin.getServiceName(KerberosLogin.java:301)\n\tat org.apache.kafka.common.security.kerberos.KerberosLogin.configure(KerberosLogin.java:92)
at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:60)
at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:111)
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:142)
... 20 more
【问题讨论】:
【参考方案1】:您需要在注册连接器时在 JSON 正文中添加 SSL 属性:
database.history.producer.security.protocol=SSL
database.history.producer.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
database.history.producer.ssl.keystore.password=test1234
database.history.producer.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
database.history.producer.ssl.truststore.password=test1234
database.history.producer.ssl.key.password=test1234
database.history.consumer.security.protocol=SSL
database.history.consumer.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
database.history.consumer.ssl.keystore.password=test1234
database.history.consumer.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
database.history.consumer.ssl.truststore.password=test1234
database.history.consumer.ssl.key.password=test1234
参考:https://debezium.io/docs/connectors/mysql/(滚动到结尾)
【讨论】:
在此之后,您的历史主题将被创建,但不会创建表主题。为此,您还需要在 connect-distributed.properties 中为生产者和消费者 SSL 添加属性【参考方案2】:此错误表明您的 kafka 客户端看不到 jaas 配置。要解决此问题,您可以导出以下变量:
export KAFKA_OPTS="-Djava.security.auth.login.config=path/to/jaas.conf"
【讨论】:
我试过这个,设置 KAFKA_OPTS 然后运行 curl 命令,但遇到了同样的问题。以上是关于无法在启用 SSL 的 Kafka 集群中注册 Debezium (Kafka-Connect) 连接器的主要内容,如果未能解决你的问题,请参考以下文章
启用 SSL 后 Kafka Connect 超出 Java 堆空间
Strimzi 操作员 Kafka 集群 ACL 未启用类型:简单