Kafka Elasticsearch 接收器连接器:连接错误
Posted
技术标签:
【中文标题】Kafka Elasticsearch 接收器连接器:连接错误【英文标题】:Kafka Elasticsearch Sink Connector: Connection Error 【发布时间】:2021-01-08 07:04:29 【问题描述】:我不熟悉使用 Kafka 和 Kafka 连接器。我一直在尝试使用 Elastic Search 作为接收器,使用 Kafka 连接器从我的应用程序流式传输数据。我能够在 Kafka 中看到消息,但我的连接器不断在下面抛出此错误:
ERROR WorkerSinkTaskid=elasticsearch-sink-0 Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
org.apache.kafka.connect.errors.ConnectException: Couldn't start ElasticsearchSinkTask due to connection error:
at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.<init>(JestElasticsearchClient.java:159)
at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.<init>(JestElasticsearchClient.java:142)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.start(ElasticsearchSinkTask.java:122)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.start(ElasticsearchSinkTask.java:51)
at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:300)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:189)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: io.searchbox.client.config.exception.CouldNotConnectException: Could not connect to http://elasticsearch:9200
at io.searchbox.client.http.JestHttpClient.execute(JestHttpClient.java:73)
at io.searchbox.client.http.JestHttpClient.execute(JestHttpClient.java:63)
at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.getServerVersion(JestElasticsearchClient.java:247)
at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.<init>(JestElasticsearchClient.java:151)
... 12 more
Caused by: org.apache.http.conn.HttpHostConnectException: Connect to elasticsearch:9200 [elasticsearch/172.20.0.7] failed: Connection refused (Connection refused)
at org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:159)
at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:359)
at org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:381)
at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:237)
at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:185)
at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89)
at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:111)
at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:108)
at io.searchbox.client.http.JestHttpClient.executeRequest(JestHttpClient.java:136)
at io.searchbox.client.http.JestHttpClient.execute(JestHttpClient.java:70)
... 15 more
Caused by: java.net.ConnectException: Connection refused (Connection refused)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at org.apache.http.conn.socket.PlainConnectionSocketFactory.connectSocket(PlainConnectionSocketFactory.java:75)
at org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)
... 26 more
[2020-09-22 08:16:32,656] ERROR WorkerSinkTaskid=elasticsearch-sink-0 Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:180)
我已经把所有东西都docker化了,我有4个容器,一个用于我的应用程序,一个用于elasticsearch,一个用于kafka,一个用于连接器,如下所示:
zookeeper:
container_name: zookeeper
image: wurstmeister/zookeeper:latest
env_file:
- ".env"
ports:
- 2181:2181
networks:
- "main_net"
kafka:
container_name: kafka
image: wurstmeister/kafka:2.11-1.0.2
env_file:
- ".env"
depends_on:
- zookeeper
links:
- zookeeper
ports:
- 9092:9092
- 9094:9094
networks:
- "main_net"
connector_standalone:
container_name: container_standalone
build:
context: kafka/
dockerfile: Dockerfile
depends_on:
- kafka
ports:
- 8083:8083
networks:
- "main_net"
elasticsearch:
container_name: elasticsearch
build:
context: elasticsearch/
ports:
- "9200:9200"
- "9300:9300"
env_file:
- ".env"
environment:
discovery.type: single-node
networks:
- "main_net"
website:
container_name: "application"
build: "./"
command: >
volumes:
- "./application:/app"
ports:
- "8000:8000"
networks:
- "main_net"
据我了解,我已经给出了连接 URL,基于它应该连接到的主机,即 elasticsearch 容器。但是我对错误感到困惑什么是错的。以下是我的配置文件:
name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=vehicle
topic.index=vehicles
connection.url=http://elasticsearch:9200/
connection.user=elastic
connection.password=changeme
type.name=log
key.ignore=true
schema.ignore=true
【问题讨论】:
您使用的是哪个版本的 Apache Kafka、连接器插件和 Elasticsearch?你的话题真的叫logs
吗?
FWIW 你可以在这里找到一个 Kafka 的工作示例 -> 使用 Docker Compose 的 Elasticsearch:rmoff.dev/kafka-elasticsearch(随附视频:rmoff.dev/kafka-elasticsearch-video)
我的错!我已经更新了正确的主题名称。
您使用的是哪个版本的 Apache Kafka、连接器插件和 Elasticsearch?
顺便说一句,如果您编辑问题以更改错误,那么最好在 cmets 中注意这一点!我才意识到你修改了它。
【参考方案1】:
因为这个而失败
Connect to elasticsearch:9200 [elasticsearch/172.20.0.7] failed: Connection refused
因此,要么 Elasticsearch 尚未启动,要么已启动但不接受连接。
在创建您的 Kafka Connect 连接器之前,请确保您的 Kafka Connect 工作人员可以通过运行此命令来使用 Elasticsearch
docker exec connector_standalone curl -sS elasticsearch:9200/
你应该得到类似的东西
"name" : "216261a864bd",
"cluster_name" : "docker-cluster",
"cluster_uuid" : "tGHje8KSTPiafT7CLt77uQ",
"version" :
"number" : "7.6.2",
"build_flavor" : "default",
"build_type" : "docker",
"build_hash" : "ef48eb35cf30adf4db14086e8aabd07ef6fb113f",
"build_date" : "2020-03-26T06:34:37.794943Z",
"build_snapshot" : false,
"lucene_version" : "8.4.0",
"minimum_wire_compatibility_version" : "6.8.0",
"minimum_index_compatibility_version" : "6.0.0-beta1"
,
"tagline" : "You Know, for Search"
FWIW 你可以找到一个 Kafka 的工作示例 -> Elasticsearch using Docker Compose here (?)
【讨论】:
"error":"root_cause":["type":"security_exception","reason":"missing authentication credentials for REST request [/]","header":"WWW-Authenticate":"Basic realm=\"security\" charset=\"UTF-8\""],"type":"security_exception","reason":"missing authentication credentials for REST request [/]","header":"WWW-Authenticate":"Basic realm=\"security\" charset=\"UTF-8\"","status":401%
这是我得到的,但我提供了正确的用户名和密码,我也可以登录。
这是您在问题中提出的不同错误。如果您将其标记为已回答(我相信这是针对您的原始问题),它可以让 *** 易于为其他人导航,然后使用新错误开始一个新问题。以上是关于Kafka Elasticsearch 接收器连接器:连接错误的主要内容,如果未能解决你的问题,请参考以下文章
是否可以为一个带有使用 debezium 和 kafka 的表的数据库创建一个 Elasticsearch 索引?
KAFKA SINK CONNECT: WARN 批量请求 167 失败。重试请求
将 Elasticsearch 中的数据读入 Flink 聚合?
Kafka 如何使用 SSL 连接 Elasticsearch?