Kafka 如何使用 SSL 连接 Elasticsearch?
Posted
技术标签:
【中文标题】Kafka 如何使用 SSL 连接 Elasticsearch?【英文标题】:How to Kafka Connect Elasticsearch with SSL? 【发布时间】:2020-02-11 08:12:12 【问题描述】:您好,在查看 https://docs.confluent.io/current/connect/kafka-connect-elasticsearch/configuration_options.html#security 之后,我不确定 Kafka 需要哪个配置连接到 https 上的 Elasticsearch 端点。
我能够将 Kafka 连接到不同的非 https Elasticsearch,并且在常规客户端上,我可以通过关闭 SSL 验证来连接到 https。
这个来自不同服务提供商 https://help.aiven.io/en/articles/2349675-aiven-kafka-elasticsearch-sink-connector 的示例使它看起来像 connection.url
、connection.username
和 connection.password
对于 https 来说已经足够了,但我收到了错误:
org.apache.kafka.connect.errors.ConnectException: Couldn't start ElasticsearchSinkTask due to connection error:
at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.<init>(JestElasticsearchClient.java:132)
at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.<init>(JestElasticsearchClient.java:122)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.start(ElasticsearchSinkTask.java:122)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.start(ElasticsearchSinkTask.java:51)
at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:300)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:189)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: javax.net.ssl.SSLHandshakeException: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
at sun.security.ssl.Alerts.getSSLException(Alerts.java:192)
at sun.security.ssl.SSLSocketImpl.fatal(SSLSocketImpl.java:1964)
at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:328)
at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:322)
at sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1614)
at sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:216)
at sun.security.ssl.Handshaker.processLoop(Handshaker.java:1052)
at sun.security.ssl.Handshaker.process_record(Handshaker.java:987)
at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:1072)
at sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1385)
at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1413)
at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1397)
at org.apache.http.conn.ssl.SSLConnectionSocketFactory.createLayeredSocket(SSLConnectionSocketFactory.java:394)
at org.apache.http.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:353)
at org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:141)
at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:353)
at org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:380)
at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236)
at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:184)
at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:88)
at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110)
at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:184)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
at io.searchbox.client.http.JestHttpClient.executeRequest(JestHttpClient.java:115)
at io.searchbox.client.http.JestHttpClient.execute(JestHttpClient.java:57)
at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.getServerVersion(JestElasticsearchClient.java:206)
at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.<init>(JestElasticsearchClient.java:130)
... 12 more
Caused by: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
at sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:397)
at sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:302)
at sun.security.validator.Validator.validate(Validator.java:260)
at sun.security.ssl.X509TrustManagerImpl.validate(X509TrustManagerImpl.java:324)
at sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:229)
at sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:124)
at sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1596)
... 34 more
Caused by: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
at sun.security.provider.certpath.SunCertPathBuilder.build(SunCertPathBuilder.java:141)
at sun.security.provider.certpath.SunCertPathBuilder.engineBuild(SunCertPathBuilder.java:126)
at java.security.cert.CertPathBuilder.build(CertPathBuilder.java:280)
at sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:392)
... 40 more
在本地运行 Confluent 5.2.2,Elasticsearch 6.8.3。
配置:
"name": "sink-elasticsearch-mytable-ssl",
"config":
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter.schema.registry.url": "http://localhost:8081",
"connection.url": "https://xxx.databases.appdomain.cloud:30810",
"connection.username": "ibm_cloud_xxx",
"connection.password": "xxx",
"type.name": "type.name=kafka-connect",
"topics": "postgresql-mytable",
"key.ignore": true
我使用的是托管 Elasticsearch 服务,所以我得到的只是用户名/密码和 TLS 证书,所以我看不到密钥库/信任库的内容。 Kafka Connect Elasticsearch with SSL 还需要什么?
【问题讨论】:
您使用的是哪个版本的 Kafka Connect?哪个版本的 Elasticsearch 接收器连接器?您能否编辑您的问题以包含连接器 JSON 配置。 在本地运行 Confluent 5.2.2 以及与之捆绑的任何内容。更新的问题。 【参考方案1】:对于 Elastic Cloud,您只需传递用户名和密码:
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"connection.url": "https://myclusterid.us-east-1.aws.found.io:9243",
"connection.username": "elastic",
"connection.password": "covfefe",
[…]
【讨论】:
我对此进行了测试,似乎是版本问题。我在 6.8.3 上试用了 Elastic Cloud 并遇到了同样的失败。然后我将它升级到 7.4.0 并成功连接。以上是关于Kafka 如何使用 SSL 连接 Elasticsearch?的主要内容,如果未能解决你的问题,请参考以下文章
在哪里/如何编辑 nginx.conf 以在 Elastic Beanstalk 上启用带有附加标头的 SSL
Elastic Load Balancing 非终止 SSL 连接上的代理协议
无法在启用 SSL 的 Kafka 集群中注册 Debezium (Kafka-Connect) 连接器