无法在启用 SSL 的 Kafka 集群中注册 Debezium (Kafka-Connect) 连接器

Posted

技术标签:

【中文标题】无法在启用 SSL 的 Kafka 集群中注册 Debezium (Kafka-Connect) 连接器【英文标题】:Unable to register Debezium (Kafka-Connect) connector in SSL enabled Kafka cluster 【发布时间】:2019-11-27 10:09:57 【问题描述】:

我正在尝试在启用 SSL 的 Kafka 集群中注册 mysql Debezium 连接器。我为此目的使用的卷曲是:

curl -k -X POST -H "Accept:application/json"  -H "Content-Type:application/json" https://<IPADDRESS>:8083/connectors/  -d ' "name": "test-eds-extactor-profile", "config":  "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "<DBHOSTNAME>", "database.port": "3306", "database.user": "debezium", "database.password": "*****", "database.server.id": "1", "database.server.name": "MySQL-Database-Docker", "database.history.kafka.bootstrap.servers": "<IPADDRESS>:9094", "database.history.kafka.topic": "dbhistory.profile" , "include.schema.changes": "true", "table.whitelist": "test_eds_extraction_src_db_mock.profile", "database.history.producer.security.protocol": "SASL_PLAINTEXT", "database.history.producer.ssl.keystore.location": "path/to/server.jks", "database.history.producer.ssl.keystore.password": "******", "database.history.producer.ssl.truststore.location": "path/to//server.jks", "database.history.producer.ssl.truststore.password": "******", "database.history.producer.ssl.key.password": "******", "database.history.consumer.security.protocol": "SASL_PLAINTEXT", "database.history.consumer.ssl.keystore.location": "path/to/server.jks", "database.history.consumer.ssl.keystore.password": "******", "database.history.consumer.ssl.truststore.location": "path/to/server.jks", "database.history.consumer.ssl.truststore.password": "******", "database.history.consumer.ssl.key.password": "******"  '

Debezium 无法创建 database.history 主题,失败并出现以下错误:

"name":"test-eds-extactor-profile","connector":"state":"RUNNING","worker_id":"<IPADDRESS>:8083","tasks":["id":0,"state":"FAILED","worker_id":"<IPADDRESS>:8083","trace":"org.apache.kafka.connect.errors.ConnectException: org.apache.kafka.common.KafkaException: Failed to construct kafka producer\n\tat io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:273)\n\tat io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:47)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:198)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.common.KafkaException: Failed to construct kafka producer\n\tat org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:429)\n\tat org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:297)\n\tat io.debezium.relational.history.KafkaDatabaseHistory.start(KafkaDatabaseHistory.java:171)\n\tat io.debezium.connector.mysql.MySqlSchema.start(MySqlSchema.java:161)\n\tat io.debezium.connector.mysql.MySqlTaskContext.start(MySqlTaskContext.java:255)\n\tat io.debezium.connector.mysql.MySqlConnectorTask.createAndStartTaskContext(MySqlConnectorTask.java:330)\n\tat io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:136)\n\t... 9 more\nCaused by: org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config\n\tat org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:153)\n\tat org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:140)\n\tat org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:65)\n\tat org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:108)\n\tat org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:437)\n\tat org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:418)\n\t... 15 more\nCaused by: java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config\n\tat org.apache.kafka.common.security.kerberos.KerberosLogin.getServiceName(KerberosLogin.java:301)\n\tat org.apache.kafka.common.security.kerberos.KerberosLogin.configure(KerberosLogin.java:92)\n\tat org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:60)\n\tat org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:111)\n\tat org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:142)\n\t... 20 more\n"],"type":"source"

美化错误:

Failed to construct kafka producer
    at io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:273)
    at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:47)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:198)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:429)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:297)
    at io.debezium.relational.history.KafkaDatabaseHistory.start(KafkaDatabaseHistory.java:171)
    at io.debezium.connector.mysql.MySqlSchema.start(MySqlSchema.java:161)
    at io.debezium.connector.mysql.MySqlTaskContext.start(MySqlTaskContext.java:255)
    at io.debezium.connector.mysql.MySqlConnectorTask.createAndStartTaskContext(MySqlConnectorTask.java:330)
    at io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:136)
    ... 9 more
Caused by: org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config
    at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:153)
    at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:140)
    at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:65)
    at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:108)
    at org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:437)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:418)
    ... 15 more
Caused by: java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config
    at org.apache.kafka.common.security.kerberos.KerberosLogin.getServiceName(KerberosLogin.java:301)\n\tat org.apache.kafka.common.security.kerberos.KerberosLogin.configure(KerberosLogin.java:92)
    at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:60)
    at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:111)
    at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:142)
    ... 20 more

【问题讨论】:

【参考方案1】:

您需要在注册连接器时在 JSON 正文中添加 SSL 属性:

database.history.producer.security.protocol=SSL
database.history.producer.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
database.history.producer.ssl.keystore.password=test1234
database.history.producer.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
database.history.producer.ssl.truststore.password=test1234
database.history.producer.ssl.key.password=test1234
database.history.consumer.security.protocol=SSL
database.history.consumer.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
database.history.consumer.ssl.keystore.password=test1234
database.history.consumer.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
database.history.consumer.ssl.truststore.password=test1234
database.history.consumer.ssl.key.password=test1234

参考:https://debezium.io/docs/connectors/mysql/(滚动到结尾)

【讨论】:

在此之后,您的历史主题将被创建,但不会创建表主题。为此,您还需要在 connect-distributed.properties 中为生产者和消费者 SSL 添加属性【参考方案2】:

此错误表明您的 kafka 客户端看不到 jaas 配置。要解决此问题,您可以导出以下变量:

export KAFKA_OPTS="-Djava.security.auth.login.config=path/to/jaas.conf"

【讨论】:

我试过这个,设置 KAFKA_OPTS 然后运行 ​​curl 命令,但遇到了同样的问题。

以上是关于无法在启用 SSL 的 Kafka 集群中注册 Debezium (Kafka-Connect) 连接器的主要内容,如果未能解决你的问题,请参考以下文章

启用 SSL 后 Kafka Connect 超出 Java 堆空间

无法通过KafkaIO在kafka读取梁

Kafka配置SSL(云环境)

Strimzi 操作员 Kafka 集群 ACL 未启用类型:简单

使用本地 kafka-connect 集群连接远程数据库的连接超时

MySQL启用SSL连接