Kafka 控制台生产者无法连接到代理

Posted

技术标签:

【中文标题】Kafka 控制台生产者无法连接到代理【英文标题】:Kafka console producer cannot connect to the broker 【发布时间】:2018-06-16 08:01:28 【问题描述】:

使用控制台生产者使用以下命令连接到 Kafka 代理:

KAFA_HEAP_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Dsun.security.krb5.debug=true" \
bin/kafka-console-producer.sh \
--broker-list server-01.eigenroute.com:9092 
--topic test-topic \
--producer.config config/sasl-producer.properties

失败并出现此警告:

>test message
[2018-01-06 15:29:10,724] WARN [Producer clientId=console-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-01-06 15:29:10,816] WARN [Producer clientId=console-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

我的 Kafka 代理似乎运行正常:

KAFKA_HEAP_OPTS="-Djava.security.auth.login.config=/home/kafka/kafka_2.11-1.0.0/config/jaas.conf -Dsun.security.krb5.debug=true -Djava.security.krb5.conf=/etc/krb5.conf -Xmx256M -Xms128M" bin/kafka-server-start.sh config/server-sasl-brokers-zookeeper.properties
[2018-01-06 19:59:27,853] INFO KafkaConfig values:
        advertised.host.name = null
        advertised.listeners = SASL_PLAINTEXT://server-01.eigenroute.com:9092
...
zookeeper.connect = zookeeper-server-01.eigenroute.com:2181,zookeeper-server-02.eigenroute.com
:2181,zookeeper-server-03.eigenroute.com:2181/apps/kafka-cluster-demo
...
[2018-01-06 19:59:29,173] INFO zookeeper state changed (SaslAuthenticated) (org.I0Itec.zkclient.ZkClie
nt)
[2018-01-06 19:59:29,207] INFO Created zookeeper path /apps/kafka-cluster-demo (kafka.server.KafkaServer)
...
[2018-01-06 19:59:30,174] INFO zookeeper state changed (SaslAuthenticated) (org.I0Itec.zkclient.ZkClient)
[2018-01-06 19:59:30,389] INFO Cluster ID = TldZ-s6DQtWxpjl045dPlg (kafka.server.KafkaServer)
[2018-01-06 19:59:30,457] INFO [ThrottledRequestReaper-Fetch]: Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
...
[2018-01-06 19:59:33,035] INFO Successfully authenticated client: authenticationID=kafka-broker-1-1/server-01.eigenroute.com@EIGENROUTE.COM; authorizationID=kafka-broker-1-1/server-01.eigenroute.com@EIGENROUTE.COM. (org.apache.kafka.common.security.authenticator.SaslServerCallbackHandler)
[2018-01-06 19:59:33,082] INFO [ReplicaFetcherManager on broker 11] Removed fetcher for partitions test-topic-0 (kafka.server.ReplicaFetcherManager)
[2018-01-06 19:59:33,381] INFO Replica loaded for partition test-topic-0 with initial high watermark 0 (kafka.cluster.Replica)
[2018-01-06 19:59:33,385] INFO [Partition test-topic-0 broker=11] test-topic-0 starts at Leader Epoch 1 from offset 0. Previous Leader Epoch was: -1 (kafka.cluster.Partition)
[2018-01-06 19:59:33,424] INFO [ReplicaFetcherManager on broker 11] Removed fetcher for partitions test-topic-0 (kafka.server.ReplicaFetcherManager)
[2018-01-06 19:59:33,424] INFO [Partition test-topic-0 broker=11] test-topic-0 starts at Leader Epoch 2 from offset 0. Previous Leader Epoch was: 1 (kafka.cluster.Partition)
[2018-01-06 20:09:31,261] INFO [GroupMetadataManager brokerId=11] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2018-01-06 20:19:31,261] INFO [GroupMetadataManager brokerId=11] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2018-01-06 20:29:31,261] INFO [GroupMetadataManager brokerId=11] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)

这是我的生产者配置 (config/sasl-producer.properties):

bootstrap.servers=server-01.eigenroute.com:9092
compression.type=none
security.protocol=SASL_PLAINTEXT
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka
sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
        useKeyTab=true \
        storeKey=true  \
        keyTab="/Users/shafiquejamal/allfiles/kerberos/producer1.whatever.keytab" \
        principal="producer1/whatever@EIGENROUTE.COM";

这是我的代理配置 (config/server-sasl-brokers-zookeeper.properties):

broker.id=11
listeners=SASL_PLAINTEXT://server-01.eigenroute.com:9092
advertised.listeners=SASL_PLAINTEXT://server-01.eigenroute.com:9092
# host.name=server-01.eigenroute.com
security.inter.broker.protocol=SASL_PLAINTEXT
# sasl.kerberos.service.name=kafka-broker-1-1/server-01.eigenroute.com
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/var/log/kafka
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=zookeeper-server-01.eigenroute.com:2181,zookeeper-server-02.eigenroute.com:2181,zookeeper-server-03.eigenroute.com:2181/apps/kafka-cluster-demo
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0

请注意,我在 Kafka 代理和 ZooKeeper 之间以及 Kafka 代理和 Kafka 客户端(在本例中只有一个生产者)之间使用 SASL 身份验证。以下是我的 Kafka 代理 jaas.conf 文件的内容:

KafkaServer 
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="/home/kafka/kafka_2.11-1.0.0/config/kafka-broker-1-1.server-01.eigenroute.com.keytab"
  storeKey=true
  useTicketCache=false
  serviceName="kafka-broker-1-1"
  principal="kafka-broker-1-1/server-01.eigenroute.com@EIGENROUTE.COM";
;

// This is for the broker acting as a client to ZooKeeper
Client 
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="/home/kafka/kafka_2.11-1.0.0/config/kafka-broker-1-1.server-01.eigenroute.com.keytab"
  storeKey=true
  useTicketCache=false
  serviceName="zookeeper"
  principal="kafka-broker-1-1/server-01.eigenroute.com@EIGENROUTE.COM";
;

在我的/etc/hosts 文件中,我有以下条目:

127.0.0.1 server-01.eigenroute.com

关于为什么生产者客户端无法连接到 Kafka 代理的任何建议?谢谢!

更新:这里是 ZooKeeper znode /apps/kafka-cluster-demo/brokers/ids/11 的内容:

[zk: zookeeper-server-02.eigenroute.com:2181(CONNECTED) 27] get /apps/kafka-cluster-demo/brokers/ids/11
"listener_security_protocol_map":"SASL_PLAINTEXT":"SASL_PLAINTEXT","endpoints":["SASL_PLAINTEXT://server-01.eigenroute.com:9092"],"jmx_port":-1,"host":null,"timestamp":"1515275931134","port":-1,"version":4
cZxid = 0x2c0000023c
ctime = Sat Jan 06 21:58:51 UTC 2018
mZxid = 0x2c0000023c
mtime = Sat Jan 06 21:58:51 UTC 2018
pZxid = 0x2c0000023c
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x1001d6237f1001c
dataLength = 209
numChildren = 0

【问题讨论】:

【参考方案1】:

我上面的配置有两个问题。第一个是,对于生产者属性,config/sasl-producer.properties 中的行

sasl.kerberos.service.name=kafka

应该是

sasl.kerberos.service.name=kafka-broker-1-1

这是因为客户端中的服务名称必须与代理中的服务名称匹配。解决这个问题后,出现了第二个问题:

org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after...

下面的帖子有解决这个问题的答案:

ERROR Error when sending message to topic

对于 Kafka 代理,config/server-sasl-brokers-zookeeper.properties 我必须更改

listeners=SASL_PLAINTEXT://server-01.eigenroute.com:9092

listeners=SASL_PLAINTEXT://0.0.0.0:9092

(这可能与使用 AWS 有关)。现在一切都很好 - 生产者可以写入主题,消费者可以从主题中读取。

【讨论】:

以上是关于Kafka 控制台生产者无法连接到代理的主要内容,如果未能解决你的问题,请参考以下文章

Sprint 启动 kafka Consumer 无法连接到 kafka 容器

已连接到组协调器但无法连接到 kafka 节点

无法连接到 spotify kafka 容器,基本连接问题

无法连接到 SOCKS 代理:连接被拒绝:连接

为啥Kafka消费者连接zookeeper,生产者从broker获取元数据?

无法连接到SOCKS代理:连接被拒绝:连接