无法连接到 spotify kafka 容器,基本连接问题

Posted

技术标签:

【中文标题】无法连接到 spotify kafka 容器,基本连接问题【英文标题】:can't connect to spotify kafka container, basic connection problems 【发布时间】:2017-06-14 06:50:44 【问题描述】:

在 docker 和 kafka 基础上磕磕绊绊,无法获得客户端连接

到目前为止我做了什么

1) 在 Windows 10 上安装了 docker windows。 2)打开kitematic,搜索kafka,选择spotify kafka(wurstmeister镜像启动失败)。 3) 容器启动,我可以在容器日志中看到正在运行的映像。 4) ip and ports 报告 docker 端口 9092 - 访问端口为 localhost:32768

docker ps 显示了这个 7bf9f9278e64 spotify/kafka:latest "supervisord -n" 2 hours ago Up 57 minutes 0.0.0.0:32769->2181/tcp, 0.0.0.0:32768->9092/tcp kafka

docker-machine 活动,不返回活动主机

我的 groovy 类(从示例中剪切和粘贴的方式设置这样的连接

class KafkaProducer 

    String topicName = "wills topic"
    Producer<String, String> producer    
def init () 
    Properties props = new Properties()
    props.put("bootstrap.servers", "192.168.1.89:32768" )   //Assign localhost id and external port (9092 int)
    props.put("acks", "all")                            //Set acknowledgements for producer requests.
    props.put("retries", 0)                             //If the request fails, the producer can automatically retry,
    props.put("batch.size", 16384)                      //Specify buffer size in config
    props.put("linger.ms", 1)                           //Reduce the no of requests less than 0
    props.put("buffer.memory", 33554432)                //The buffer.memory controls the total amount of memory available to the producer for buffering.
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")

    producer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(props)
  ....

当我运行这个初始化时,我收到错误说它无法解析连接,对于 java.io.IOException:无法解析地址:7bf9f9278e64:9092,这是内部容器端口。 (我的脚本是从我的普通 IDE 桌面环境调用的)

kitmatic 说这是映射。那为什么我不能连接然后发送? 另外,我只是通过 kitematic 下载,如果你想更改配置,将 docker-compose.yml 放在哪里。真的不清楚在哪里可以做到这一点。

18:05:41.022 [main] INFO  o.a.k.c.p.ProducerConfig:[.logAll:] > ProducerConfig values: 
    acks = all
    batch.size = 16384
    block.on.buffer.full = false
    bootstrap.servers = [192.168.1.89:32768]
    buffer.memory = 33554432
    client.id = 
    compression.type = none
    connections.max.idle.ms = 540000
    interceptor.classes = null
    key.serializer = class org.apache.kafka.common.serialization.StringSerializer
    linger.ms = 1
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.fetch.timeout.ms = 60000
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 0
    retry.backoff.ms = 100
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    timeout.ms = 30000
    value.serializer = class org.apache.kafka.common.serialization.StringSerializer

18:05:41.076 [main] INFO  o.a.k.c.p.ProducerConfig:[.logAll:] > ProducerConfig values: 
    acks = all
    batch.size = 16384
    block.on.buffer.full = false
    bootstrap.servers = [192.168.1.89:32768]
    buffer.memory = 33554432
    client.id = producer-1
    compression.type = none
    connections.max.idle.ms = 540000
    interceptor.classes = null
    key.serializer = class org.apache.kafka.common.serialization.StringSerializer
    linger.ms = 1
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.fetch.timeout.ms = 60000
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 0
    retry.backoff.ms = 100
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    timeout.ms = 30000
    value.serializer = class org.apache.kafka.common.serialization.StringSerializer

18:05:41.079 [main] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name bufferpool-wait-time
18:05:41.083 [main] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name buffer-exhausted-records
18:05:41.085 [main] DEBUG o.a.k.c.Metadata:[.update:] > Updated cluster metadata version 1 to Cluster(id = null, nodes = [192.168.1.89:32768 (id: -1 rack: null)], partitions = [])
18:05:41.401 [main] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name connections-closed:
18:05:41.401 [main] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name connections-created:
18:05:41.402 [main] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name bytes-sent-received:
18:05:41.402 [main] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name bytes-sent:
18:05:41.406 [main] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name bytes-received:
18:05:41.406 [main] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name select-time:
18:05:41.407 [main] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name io-time:
18:05:41.409 [main] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name batch-size
18:05:41.410 [main] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name compression-rate
18:05:41.410 [main] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name queue-time
18:05:41.410 [main] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name request-time
18:05:41.410 [main] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name produce-throttle-time
18:05:41.411 [main] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name records-per-request
18:05:41.412 [main] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name record-retries
18:05:41.412 [main] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name errors
18:05:41.412 [main] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name record-size-max
18:05:41.414 [main] WARN  o.a.k.c.p.ProducerConfig:[.logUnused:] > The configuration 'key.deserializer' was supplied but isn't a known config.
18:05:41.414 [kafka-producer-network-thread | producer-1] DEBUG o.a.k.c.p.i.Sender:[.run:] > Starting Kafka producer I/O thread.
18:05:41.414 [main] WARN  o.a.k.c.p.ProducerConfig:[.logUnused:] > The configuration 'value.deserializer' was supplied but isn't a known config.
18:05:41.416 [main] INFO  o.a.k.c.u.AppInfoParser:[.<init>:] > Kafka version : 0.10.1.1
18:05:41.416 [main] INFO  o.a.k.c.u.AppInfoParser:[.<init>:] > Kafka commitId : f10ef2720b03b247
18:05:41.417 [main] DEBUG o.a.k.c.p.KafkaProducer:[.<init>:] > Kafka producer started
18:05:41.430 [kafka-producer-network-thread | producer-1] DEBUG o.a.k.c.NetworkClient:[.maybeUpdate:] > Initialize connection to node -1 for sending metadata request
18:05:41.430 [kafka-producer-network-thread | producer-1] DEBUG o.a.k.c.NetworkClient:[.initiateConnect:] > Initiating connection to node -1 at 192.168.1.89:32768.
18:05:41.434 [kafka-producer-network-thread | producer-1] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name node--1.bytes-sent
18:05:41.434 [kafka-producer-network-thread | producer-1] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name node--1.bytes-received
18:05:41.435 [kafka-producer-network-thread | producer-1] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name node--1.latency
18:05:41.435 [kafka-producer-network-thread | producer-1] DEBUG o.a.k.c.n.Selector:[.pollSelectionKeys:] > Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1
18:05:41.436 [kafka-producer-network-thread | producer-1] DEBUG o.a.k.c.NetworkClient:[.handleConnections:] > Completed connection to node -1
18:05:41.452 [kafka-producer-network-thread | producer-1] DEBUG o.a.k.c.NetworkClient:[.maybeUpdate:] > Sending metadata request topics=[wills topic] to node -1
18:05:41.476 [kafka-producer-network-thread | producer-1] WARN  o.a.k.c.NetworkClient:[.handleResponse:] > Error while fetching metadata with correlation id 0 : wills topic=INVALID_TOPIC_EXCEPTION
18:05:41.477 [kafka-producer-network-thread | producer-1] DEBUG o.a.k.c.Metadata:[.update:] > Updated cluster metadata version 2 to Cluster(id = 8cjV2Ga6RB6bXfeDWWfTKA, nodes = [7bf9f9278e64:9092 (id: 0 rack: null)], partitions = [])
18:05:41.570 [kafka-producer-network-thread | producer-1] DEBUG o.a.k.c.NetworkClient:[.maybeUpdate:] > Initialize connection to node 0 for sending metadata request
18:05:41.570 [kafka-producer-network-thread | producer-1] DEBUG o.a.k.c.NetworkClient:[.initiateConnect:] > Initiating connection to node 0 at 7bf9f9278e64:9092.
18:05:43.826 [kafka-producer-network-thread | producer-1] DEBUG o.a.k.c.NetworkClient:[.initiateConnect:] > Error connecting to node 0 at 7bf9f9278e64:9092:
java.io.IOException: Can't resolve address: 7bf9f9278e64:9092
    at org.apache.kafka.common.network.Selector.connect(Selector.java:180)
    at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:498)
    at org.apache.kafka.clients.NetworkClient.access$400(NetworkClient.java:48)
    at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:645)
    at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:552)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:258)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:236)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:135)
    at java.lang.Thread.run(Thread.java:745)

感谢帮助我克服这第一个障碍

【问题讨论】:

【参考方案1】:

在启动容器时尝试设置--env ADVERTISED_HOST=192.168.1.89--env ADVERTISED_PORT=32768。这是必需的,因为默认情况下,Kafka 会通告本地主机名(即容器主机名,例如 7bf9f9278e64),并且无法从主机访问。当您使用端口绑定时,您需要公布您的主机 IP(例如 192.168.1.89)和映射端口(例如 32768)。

【讨论】:

大概就是这样。此外,您可以将-p 9092:9092 -p 2181:2181 作为命令行参数传递给docker run,这样端口就不会在您身上移动。 跳过了风筝,因为我无法控制它是如何开始的。我从powershell控制台运行运行命令如下“docker run --detach --name kafka -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=192.168.1.89 --env ADVERVTIISED_PORT=9092 spotify/kafka”(在我删除了以前的容器之后)。它似乎开始正常,“docker logs kafka”说它了。然后我运行我的客户端生产者,看起来更健康 - 但反复说“WARN o.a.k.c.NetworkClient:[.handleResponse:] > Error while fetching metadata with correlation id 1 : wills topic=INVALID_TOPIC_EXCEPTION”。 使用 docker 命令运行后如何与 kafka 进程通信。在 kafka 站点“bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test”上有此评论,但这需要我可以与分离的 kafka 进行通信并且运行一个shell命令。你如何通过 docker 与分离的容器“对话”? 这些都是单独的问题。您应该继续为此打开新帖子。我假设这个问题在这里得到了回答,因为连接本身现在正在工作。 ...如果它解决了问题,也考虑接受答案以帮助其他有相同问题的人找到正确的解决方案;-)

以上是关于无法连接到 spotify kafka 容器,基本连接问题的主要内容,如果未能解决你的问题,请参考以下文章

Sprint 启动 kafka Consumer 无法连接到 kafka 容器

我的 kafka docker 容器无法连接到我的 zookeeper docker 容器

从 Docker 容器将 PySpark 连接到 Kafka

连接到在 docker 容器中运行的 Kafka 时出错

已连接到组协调器但无法连接到 kafka 节点

无法从我的 Docker 容器内部连接到主机的 localhost