无法连接到 spotify kafka 容器,基本连接问题
Posted
技术标签:
【中文标题】无法连接到 spotify kafka 容器,基本连接问题【英文标题】:can't connect to spotify kafka container, basic connection problems 【发布时间】:2017-06-14 06:50:44 【问题描述】:在 docker 和 kafka 基础上磕磕绊绊,无法获得客户端连接
到目前为止我做了什么
1) 在 Windows 10 上安装了 docker windows。 2)打开kitematic,搜索kafka,选择spotify kafka(wurstmeister镜像启动失败)。 3) 容器启动,我可以在容器日志中看到正在运行的映像。 4) ip and ports 报告 docker 端口 9092 - 访问端口为 localhost:32768
docker ps 显示了这个 7bf9f9278e64 spotify/kafka:latest "supervisord -n" 2 hours ago Up 57 minutes 0.0.0.0:32769->2181/tcp, 0.0.0.0:32768->9092/tcp kafka
docker-machine 活动,不返回活动主机
我的 groovy 类(从示例中剪切和粘贴的方式设置这样的连接
class KafkaProducer
String topicName = "wills topic"
Producer<String, String> producer
def init ()
Properties props = new Properties()
props.put("bootstrap.servers", "192.168.1.89:32768" ) //Assign localhost id and external port (9092 int)
props.put("acks", "all") //Set acknowledgements for producer requests.
props.put("retries", 0) //If the request fails, the producer can automatically retry,
props.put("batch.size", 16384) //Specify buffer size in config
props.put("linger.ms", 1) //Reduce the no of requests less than 0
props.put("buffer.memory", 33554432) //The buffer.memory controls the total amount of memory available to the producer for buffering.
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
producer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(props)
....
当我运行这个初始化时,我收到错误说它无法解析连接,对于 java.io.IOException:无法解析地址:7bf9f9278e64:9092,这是内部容器端口。 (我的脚本是从我的普通 IDE 桌面环境调用的)
kitmatic 说这是映射。那为什么我不能连接然后发送? 另外,我只是通过 kitematic 下载,如果你想更改配置,将 docker-compose.yml 放在哪里。真的不清楚在哪里可以做到这一点。
18:05:41.022 [main] INFO o.a.k.c.p.ProducerConfig:[.logAll:] > ProducerConfig values:
acks = all
batch.size = 16384
block.on.buffer.full = false
bootstrap.servers = [192.168.1.89:32768]
buffer.memory = 33554432
client.id =
compression.type = none
connections.max.idle.ms = 540000
interceptor.classes = null
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 1
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.fetch.timeout.ms = 60000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 0
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
timeout.ms = 30000
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
18:05:41.076 [main] INFO o.a.k.c.p.ProducerConfig:[.logAll:] > ProducerConfig values:
acks = all
batch.size = 16384
block.on.buffer.full = false
bootstrap.servers = [192.168.1.89:32768]
buffer.memory = 33554432
client.id = producer-1
compression.type = none
connections.max.idle.ms = 540000
interceptor.classes = null
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 1
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.fetch.timeout.ms = 60000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 0
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
timeout.ms = 30000
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
18:05:41.079 [main] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name bufferpool-wait-time
18:05:41.083 [main] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name buffer-exhausted-records
18:05:41.085 [main] DEBUG o.a.k.c.Metadata:[.update:] > Updated cluster metadata version 1 to Cluster(id = null, nodes = [192.168.1.89:32768 (id: -1 rack: null)], partitions = [])
18:05:41.401 [main] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name connections-closed:
18:05:41.401 [main] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name connections-created:
18:05:41.402 [main] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name bytes-sent-received:
18:05:41.402 [main] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name bytes-sent:
18:05:41.406 [main] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name bytes-received:
18:05:41.406 [main] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name select-time:
18:05:41.407 [main] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name io-time:
18:05:41.409 [main] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name batch-size
18:05:41.410 [main] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name compression-rate
18:05:41.410 [main] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name queue-time
18:05:41.410 [main] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name request-time
18:05:41.410 [main] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name produce-throttle-time
18:05:41.411 [main] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name records-per-request
18:05:41.412 [main] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name record-retries
18:05:41.412 [main] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name errors
18:05:41.412 [main] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name record-size-max
18:05:41.414 [main] WARN o.a.k.c.p.ProducerConfig:[.logUnused:] > The configuration 'key.deserializer' was supplied but isn't a known config.
18:05:41.414 [kafka-producer-network-thread | producer-1] DEBUG o.a.k.c.p.i.Sender:[.run:] > Starting Kafka producer I/O thread.
18:05:41.414 [main] WARN o.a.k.c.p.ProducerConfig:[.logUnused:] > The configuration 'value.deserializer' was supplied but isn't a known config.
18:05:41.416 [main] INFO o.a.k.c.u.AppInfoParser:[.<init>:] > Kafka version : 0.10.1.1
18:05:41.416 [main] INFO o.a.k.c.u.AppInfoParser:[.<init>:] > Kafka commitId : f10ef2720b03b247
18:05:41.417 [main] DEBUG o.a.k.c.p.KafkaProducer:[.<init>:] > Kafka producer started
18:05:41.430 [kafka-producer-network-thread | producer-1] DEBUG o.a.k.c.NetworkClient:[.maybeUpdate:] > Initialize connection to node -1 for sending metadata request
18:05:41.430 [kafka-producer-network-thread | producer-1] DEBUG o.a.k.c.NetworkClient:[.initiateConnect:] > Initiating connection to node -1 at 192.168.1.89:32768.
18:05:41.434 [kafka-producer-network-thread | producer-1] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name node--1.bytes-sent
18:05:41.434 [kafka-producer-network-thread | producer-1] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name node--1.bytes-received
18:05:41.435 [kafka-producer-network-thread | producer-1] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name node--1.latency
18:05:41.435 [kafka-producer-network-thread | producer-1] DEBUG o.a.k.c.n.Selector:[.pollSelectionKeys:] > Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1
18:05:41.436 [kafka-producer-network-thread | producer-1] DEBUG o.a.k.c.NetworkClient:[.handleConnections:] > Completed connection to node -1
18:05:41.452 [kafka-producer-network-thread | producer-1] DEBUG o.a.k.c.NetworkClient:[.maybeUpdate:] > Sending metadata request topics=[wills topic] to node -1
18:05:41.476 [kafka-producer-network-thread | producer-1] WARN o.a.k.c.NetworkClient:[.handleResponse:] > Error while fetching metadata with correlation id 0 : wills topic=INVALID_TOPIC_EXCEPTION
18:05:41.477 [kafka-producer-network-thread | producer-1] DEBUG o.a.k.c.Metadata:[.update:] > Updated cluster metadata version 2 to Cluster(id = 8cjV2Ga6RB6bXfeDWWfTKA, nodes = [7bf9f9278e64:9092 (id: 0 rack: null)], partitions = [])
18:05:41.570 [kafka-producer-network-thread | producer-1] DEBUG o.a.k.c.NetworkClient:[.maybeUpdate:] > Initialize connection to node 0 for sending metadata request
18:05:41.570 [kafka-producer-network-thread | producer-1] DEBUG o.a.k.c.NetworkClient:[.initiateConnect:] > Initiating connection to node 0 at 7bf9f9278e64:9092.
18:05:43.826 [kafka-producer-network-thread | producer-1] DEBUG o.a.k.c.NetworkClient:[.initiateConnect:] > Error connecting to node 0 at 7bf9f9278e64:9092:
java.io.IOException: Can't resolve address: 7bf9f9278e64:9092
at org.apache.kafka.common.network.Selector.connect(Selector.java:180)
at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:498)
at org.apache.kafka.clients.NetworkClient.access$400(NetworkClient.java:48)
at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:645)
at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:552)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:258)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:236)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:135)
at java.lang.Thread.run(Thread.java:745)
感谢帮助我克服这第一个障碍
【问题讨论】:
【参考方案1】:在启动容器时尝试设置--env ADVERTISED_HOST=192.168.1.89
和--env ADVERTISED_PORT=32768
。这是必需的,因为默认情况下,Kafka 会通告本地主机名(即容器主机名,例如 7bf9f9278e64
),并且无法从主机访问。当您使用端口绑定时,您需要公布您的主机 IP(例如 192.168.1.89
)和映射端口(例如 32768
)。
【讨论】:
大概就是这样。此外,您可以将-p 9092:9092 -p 2181:2181
作为命令行参数传递给docker run
,这样端口就不会在您身上移动。
跳过了风筝,因为我无法控制它是如何开始的。我从powershell控制台运行运行命令如下“docker run --detach --name kafka -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=192.168.1.89 --env ADVERVTIISED_PORT=9092 spotify/kafka”(在我删除了以前的容器之后)。它似乎开始正常,“docker logs kafka”说它了。然后我运行我的客户端生产者,看起来更健康 - 但反复说“WARN o.a.k.c.NetworkClient:[.handleResponse:] > Error while fetching metadata with correlation id 1 : wills topic=INVALID_TOPIC_EXCEPTION”。
使用 docker 命令运行后如何与 kafka 进程通信。在 kafka 站点“bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test”上有此评论,但这需要我可以与分离的 kafka 进行通信并且运行一个shell命令。你如何通过 docker 与分离的容器“对话”?
这些都是单独的问题。您应该继续为此打开新帖子。我假设这个问题在这里得到了回答,因为连接本身现在正在工作。
...如果它解决了问题,也考虑接受答案以帮助其他有相同问题的人找到正确的解决方案;-)以上是关于无法连接到 spotify kafka 容器,基本连接问题的主要内容,如果未能解决你的问题,请参考以下文章
Sprint 启动 kafka Consumer 无法连接到 kafka 容器
我的 kafka docker 容器无法连接到我的 zookeeper docker 容器