Kafka:从 ZooKeeper 获取代理主机

Posted

技术标签:

【中文标题】Kafka:从 ZooKeeper 获取代理主机【英文标题】:Kafka: Get broker host from ZooKeeper 【发布时间】:2015-06-11 23:06:24 【问题描述】:

出于特殊原因,我需要同时使用 ConsumerGroup(又名高级消费者)和 SimpleConsumer(又名低级消费者)来读取 Kafka。对于ConsumerGroup,我使用基于 ZooKeeper 的配置并且对它完全满意,但SimpleConsumer 需要实例化种子代理。

我不想同时保留 ZooKeeper 和代理主机的列表。因此,我正在寻找一种方法来从 ZooKeeper自动发现特定主题的代理

由于一些间接信息,我相信这些数据存储在 ZooKeeper 中的以下路径之一:

/brokers/topics/<topic>/partitions/<partition-id>/state /brokers/ids/

但是,当我尝试从这些节点读取数据时,出现序列化错误(我为此使用 com.101tec.zkclient):

org.I0Itec.zkclient.exception.ZkMarshallingError:java.io.StreamCorruptedException:无效流标头:7B226A6D 在 org.I0Itec.zkclient.serialize.SerializableSerializer.deserialize(SerializableSerializer.java:37) 在 org.I0Itec.zkclient.ZkClient.derializable(ZkClient.java:740) 在 org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:773) 在 org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761) 在 org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:750) 在 org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:744) ... 64 省略 引起:java.io.StreamCorruptedException:无效的流标头:7B226A6D 在 java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:804) 在 java.io.ObjectInputStream.(ObjectInputStream.java:299) 在 org.I0Itec.zkclient.serialize.TcclAwareObjectIputStream.(TcclAwareObjectIputStream.java:30) 在 org.I0Itec.zkclient.serialize.SerializableSerializer.deserialize(SerializableSerializer.java:31) ... 69 更多

我可以毫无问题地编写和读取自定义 Java 对象(例如字符串),所以我相信这不是客户端的问题,而是棘手的编码问题。因此,我想知道:

    如果这是正确的方法,如何正确读取这些节点? 如果整个方法都是错误的,什么是正确的

【问题讨论】:

我发现有助于在 Zookeeper 数据中查看的一个工具是:code.google.com/p/zooviewer 【参考方案1】:

这就是我的一位同事获取 Kafka 代理列表的方式。当您想要动态获取经纪人列表时,我认为这是一种正确的方式。

这是一个示例代码,展示了如何获取列表。

public class KafkaBrokerInfoFetcher 

    public static void main(String[] args) throws Exception 
        ZooKeeper zk = new ZooKeeper("localhost:2181", 10000, null);
        List<String> ids = zk.getChildren("/brokers/ids", false);
        for (String id : ids) 
            String brokerInfo = new String(zk.getData("/brokers/ids/" + id, false, null));
            System.out.println(id + ": " + brokerInfo);
        
    

在由三个代理组成的集群上运行代码会导致

1: "jmx_port":-1,"timestamp":"1428512949385","host":"192.168.0.11","version":1,"port":9093
2: "jmx_port":-1,"timestamp":"1428512955512","host":"192.168.0.11","version":1,"port":9094
3: "jmx_port":-1,"timestamp":"1428512961043","host":"192.168.0.11","version":1,"port":9095

【讨论】:

感谢和抱歉迟到的回复。您能否也发布对您在代码中使用的 ZooKeeper 客户端库的引用?在我这边,我想出了如何修复我描述的错误,并将使用 Kafka 库附带的 com.101toc.ZkClient 发布替代解决方案。无论如何,我接受你的回答是完全有效的。 应该是import org.apache.zookeeper.ZooKeeper; +1 太简单和太有效的方式来获取有关 kafka 的信息(brokes,topics,partitions per topic ...等)谢谢。它只需要import org.apache.zookeeper.ZooKeeper【参考方案2】:

事实证明,Kafka 使用ZKStringSerializer 将数据读写到 znode 中。因此,要修复错误,我只需将其添加为 ZkClient 构造函数中的最后一个参数:

val zkClient = new ZkClient(zkQuorum, Integer.MAX_VALUE, 10000, ZKStringSerializer)

使用它,我编写了几个有用的函数来发现经纪人 ID、他们的地址和其他东西:

import kafka.utils.Json
import kafka.utils.ZKStringSerializer
import kafka.utils.ZkUtils
import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.common.KafkaException


def listBrokers(): List[Int] = 
  zkClient.getChildren("/brokers/ids").toList.map(_.toInt)


def listTopics(): List[String] = 
  zkClient.getChildren("/brokers/topics").toList


def listPartitions(topic: String): List[Int] = 
  val path = "/brokers/topics/" + topic + "/partitions"
  if (zkClient.exists(path)) 
    zkClient.getChildren(path).toList.map(_.toInt)
   else 
    throw new KafkaException(s"Topic $topic doesn't exist")
  


def getBrokerAddress(brokerId: Int): (String, Int) = 
  val path = s"/brokers/ids/$brokerId"
  if (zkClient.exists(path)) 
    val brokerInfo = readZkData(path)
    (brokerInfo.get("host").get.asInstanceOf[String], brokerInfo.get("port").get.asInstanceOf[Int])
   else 
    throw new KafkaException("Broker with ID $brokerId doesn't exist")
  


def getLeaderAddress(topic: String, partitionId: Int): (String, Int) = 
  val path = s"/brokers/topics/$topic/partitions/$partitionId/state"
  if (zkClient.exists(path)) 
    val leaderStr = zkClient.readData[String](path)
    val leaderId = Json.parseFull(leaderStr).get.asInstanceOf[Map[String, Any]].get("leader").get.asInstanceOf[Int]
    getBrokerAddress(leaderId)
   else 
    throw new KafkaException(s"Topic ($topic) or partition ($partitionId) doesn't exist")
  

【讨论】:

kafka.utils.ZkUtils 是什么库? 'org.apache.kafka',名称:'kafka_2.13'?这意味着我们必须指定我们正在使用的 kafka 的版本?【参考方案3】:

使用 shell 执行此操作:

zookeeper-shell myzookeeper.example.com:2181
ls /brokers/ids
  => [2, 1, 0]
get /brokers/ids/2
get /brokers/ids/1
get /brokers/ids/0 

【讨论】:

【参考方案4】:

实际上,Kafka 中有 ZkUtils(至少对于 0.8.x 行),您可以使用一个小警告:您需要重新实现 ZkStringSerializer,它将字符串转换为 UTF-8 编码字节数组。如果您想使用 Java8 的流 API,可以通过 scala.collection.JavaConversions 迭代 Scala 集合。这对我的案子有帮助。

【讨论】:

【参考方案5】:
 public KafkaProducer(String zookeeperAddress, String topic) throws IOException,
        KeeperException, InterruptedException 

    this.zookeeperAddress = zookeeperAddress;
    this.topic = topic;

    ZooKeeper zk = new ZooKeeper(zookeeperAddress, 10000, null);
    List<String> brokerList = new ArrayList<String>();

    List<String> ids = zk.getChildren("/brokers/ids", false);
    for (String id : ids) 
        String brokerInfoString = new String(zk.getData("/brokers/ids/" + id, false, null));
        Broker broker = Broker.createBroker(Integer.valueOf(id), brokerInfoString);
        if (broker != null) 
            brokerList.add(broker.connectionString());
        
    

    props.put("serializer.class", KAFKA_STRING_ENCODER);
    props.put("metadata.broker.list", String.join(",", brokerList));
    producer = new Producer<String, String>(new ProducerConfig(props));

【讨论】:

以上是关于Kafka:从 ZooKeeper 获取代理主机的主要内容,如果未能解决你的问题,请参考以下文章

spring boot kafka 在将 testcontainers 与 kafka、zookeeper、模式注册表一起使用时因“代理可能不可用”而失败

如何列出集群中所有可用的 Kafka 代理?

kafka+zookeeper集群部署

启动多个 kafka 代理失败

部署Kafka群集

kafka集群配置和使用