Kafka:从 ZooKeeper 获取代理主机
Posted
技术标签:
【中文标题】Kafka:从 ZooKeeper 获取代理主机【英文标题】:Kafka: Get broker host from ZooKeeper 【发布时间】:2015-06-11 23:06:24 【问题描述】:出于特殊原因,我需要同时使用 ConsumerGroup
(又名高级消费者)和 SimpleConsumer
(又名低级消费者)来读取 Kafka。对于ConsumerGroup
,我使用基于 ZooKeeper 的配置并且对它完全满意,但SimpleConsumer
需要实例化种子代理。
我不想同时保留 ZooKeeper 和代理主机的列表。因此,我正在寻找一种方法来从 ZooKeeper自动发现特定主题的代理。
由于一些间接信息,我相信这些数据存储在 ZooKeeper 中的以下路径之一:
/brokers/topics/<topic>/partitions/<partition-id>/state
/brokers/ids/
但是,当我尝试从这些节点读取数据时,出现序列化错误(我为此使用 com.101tec.zkclient
):
org.I0Itec.zkclient.exception.ZkMarshallingError:java.io.StreamCorruptedException:无效流标头:7B226A6D 在 org.I0Itec.zkclient.serialize.SerializableSerializer.deserialize(SerializableSerializer.java:37) 在 org.I0Itec.zkclient.ZkClient.derializable(ZkClient.java:740) 在 org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:773) 在 org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761) 在 org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:750) 在 org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:744) ... 64 省略 引起:java.io.StreamCorruptedException:无效的流标头:7B226A6D 在 java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:804) 在 java.io.ObjectInputStream.(ObjectInputStream.java:299) 在 org.I0Itec.zkclient.serialize.TcclAwareObjectIputStream.(TcclAwareObjectIputStream.java:30) 在 org.I0Itec.zkclient.serialize.SerializableSerializer.deserialize(SerializableSerializer.java:31) ... 69 更多
我可以毫无问题地编写和读取自定义 Java 对象(例如字符串),所以我相信这不是客户端的问题,而是棘手的编码问题。因此,我想知道:
-
如果这是正确的方法,如何正确读取这些节点?
如果整个方法都是错误的,什么是正确的?
【问题讨论】:
我发现有助于在 Zookeeper 数据中查看的一个工具是:code.google.com/p/zooviewer 【参考方案1】:这就是我的一位同事获取 Kafka 代理列表的方式。当您想要动态获取经纪人列表时,我认为这是一种正确的方式。
这是一个示例代码,展示了如何获取列表。
public class KafkaBrokerInfoFetcher
public static void main(String[] args) throws Exception
ZooKeeper zk = new ZooKeeper("localhost:2181", 10000, null);
List<String> ids = zk.getChildren("/brokers/ids", false);
for (String id : ids)
String brokerInfo = new String(zk.getData("/brokers/ids/" + id, false, null));
System.out.println(id + ": " + brokerInfo);
在由三个代理组成的集群上运行代码会导致
1: "jmx_port":-1,"timestamp":"1428512949385","host":"192.168.0.11","version":1,"port":9093
2: "jmx_port":-1,"timestamp":"1428512955512","host":"192.168.0.11","version":1,"port":9094
3: "jmx_port":-1,"timestamp":"1428512961043","host":"192.168.0.11","version":1,"port":9095
【讨论】:
感谢和抱歉迟到的回复。您能否也发布对您在代码中使用的 ZooKeeper 客户端库的引用?在我这边,我想出了如何修复我描述的错误,并将使用 Kafka 库附带的com.101toc.ZkClient
发布替代解决方案。无论如何,我接受你的回答是完全有效的。
应该是import org.apache.zookeeper.ZooKeeper;
+1 太简单和太有效的方式来获取有关 kafka 的信息(brokes,topics,partitions per topic ...等)谢谢。它只需要import org.apache.zookeeper.ZooKeeper
【参考方案2】:
事实证明,Kafka 使用ZKStringSerializer
将数据读写到 znode 中。因此,要修复错误,我只需将其添加为 ZkClient
构造函数中的最后一个参数:
val zkClient = new ZkClient(zkQuorum, Integer.MAX_VALUE, 10000, ZKStringSerializer)
使用它,我编写了几个有用的函数来发现经纪人 ID、他们的地址和其他东西:
import kafka.utils.Json
import kafka.utils.ZKStringSerializer
import kafka.utils.ZkUtils
import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.common.KafkaException
def listBrokers(): List[Int] =
zkClient.getChildren("/brokers/ids").toList.map(_.toInt)
def listTopics(): List[String] =
zkClient.getChildren("/brokers/topics").toList
def listPartitions(topic: String): List[Int] =
val path = "/brokers/topics/" + topic + "/partitions"
if (zkClient.exists(path))
zkClient.getChildren(path).toList.map(_.toInt)
else
throw new KafkaException(s"Topic $topic doesn't exist")
def getBrokerAddress(brokerId: Int): (String, Int) =
val path = s"/brokers/ids/$brokerId"
if (zkClient.exists(path))
val brokerInfo = readZkData(path)
(brokerInfo.get("host").get.asInstanceOf[String], brokerInfo.get("port").get.asInstanceOf[Int])
else
throw new KafkaException("Broker with ID $brokerId doesn't exist")
def getLeaderAddress(topic: String, partitionId: Int): (String, Int) =
val path = s"/brokers/topics/$topic/partitions/$partitionId/state"
if (zkClient.exists(path))
val leaderStr = zkClient.readData[String](path)
val leaderId = Json.parseFull(leaderStr).get.asInstanceOf[Map[String, Any]].get("leader").get.asInstanceOf[Int]
getBrokerAddress(leaderId)
else
throw new KafkaException(s"Topic ($topic) or partition ($partitionId) doesn't exist")
【讨论】:
kafka.utils.ZkUtils 是什么库? 'org.apache.kafka',名称:'kafka_2.13'?这意味着我们必须指定我们正在使用的 kafka 的版本?【参考方案3】:使用 shell 执行此操作:
zookeeper-shell myzookeeper.example.com:2181
ls /brokers/ids
=> [2, 1, 0]
get /brokers/ids/2
get /brokers/ids/1
get /brokers/ids/0
【讨论】:
【参考方案4】:实际上,Kafka 中有 ZkUtils
(至少对于 0.8.x 行),您可以使用一个小警告:您需要重新实现 ZkStringSerializer,它将字符串转换为 UTF-8 编码字节数组。如果您想使用 Java8 的流 API,可以通过 scala.collection.JavaConversions
迭代 Scala 集合。这对我的案子有帮助。
【讨论】:
【参考方案5】: public KafkaProducer(String zookeeperAddress, String topic) throws IOException,
KeeperException, InterruptedException
this.zookeeperAddress = zookeeperAddress;
this.topic = topic;
ZooKeeper zk = new ZooKeeper(zookeeperAddress, 10000, null);
List<String> brokerList = new ArrayList<String>();
List<String> ids = zk.getChildren("/brokers/ids", false);
for (String id : ids)
String brokerInfoString = new String(zk.getData("/brokers/ids/" + id, false, null));
Broker broker = Broker.createBroker(Integer.valueOf(id), brokerInfoString);
if (broker != null)
brokerList.add(broker.connectionString());
props.put("serializer.class", KAFKA_STRING_ENCODER);
props.put("metadata.broker.list", String.join(",", brokerList));
producer = new Producer<String, String>(new ProducerConfig(props));
【讨论】:
以上是关于Kafka:从 ZooKeeper 获取代理主机的主要内容,如果未能解决你的问题,请参考以下文章
spring boot kafka 在将 testcontainers 与 kafka、zookeeper、模式注册表一起使用时因“代理可能不可用”而失败