读取zookeeper保存的topic元数据

Posted skiwnywh

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了读取zookeeper保存的topic元数据相关的知识,希望对你有一定的参考价值。

读取zookeeper保存的topic元数据

1 有以下问题

  • 需要使用producer才能获得元数据
  • 当producer和consumer共用一些对象时会出现无法读取数据的问题

2 解决方法

用独立的类封装获取元数据的代码,避免共用变量

3 代码

 

3.1 KafkaHelper类

#ifndef KAFKA_HELPER_H_
#define KAFKA_HELPER_H_

#include <string>
using std::string;

#include "librdkafka/rdkafkacpp.h"
#include "librdkafka/rdkafka.h"
#include <zookeeper/zookeeper.h>
#include <zookeeper/zookeeper.jute.h>
#include <jansson.h>

#define BROKER_PATH "/brokers/ids"

static rd_kafka_t *rk;

class KafkaHelper {
 public:
  static string Brokers(string const& zookeeper) {
    zhandle_t * zh = initialize_zookeeper(zookeeper);
    char brokers[1024];
    set_brokerlist_from_zookeeper(zh, brokers);
    return brokers;
  }

  static void PrintTopicMeta(string const& topic_name) {
    /*
     * Create producer using accumulated global configuration.
     */
    RdKafka::Conf * global_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    string zookeeper("localhost:2181");
    string brokers = KafkaHelper::Brokers(zookeeper);
    string errstr;
    global_conf->set("metadata.broker.list", brokers, errstr);
    RdKafka::Conf * topic_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
    RdKafka::Producer *producer = RdKafka::Producer::create(global_conf, errstr);
    if (!producer) {
      std::cerr << "Failed to create producer: " << errstr << std::endl;
      exit(1);
    }

    std::cout << "% Created producer " << producer->name() << std::endl;

    /*
     * Create topic handle.
     */
    RdKafka::Topic * topic = RdKafka::Topic::create(producer, topic_name, topic_conf, errstr);

    if (!topic) {
      std::cerr << "Failed to create topic: " << errstr << std::endl;
      exit(1);
    }

    bool run = true;
    while (run) {
      class RdKafka::Metadata *metadata;

      // Fetch metadata 
      RdKafka::ErrorCode err = producer->metadata(topic!=NULL, topic, &metadata, 5000);
      if (err != RdKafka::ERR_NO_ERROR) {
        std::cerr << "%% Failed to acquire metadata: " 
                  << RdKafka::err2str(err) << std::endl;
        run = 0;
        break;
      }

      KafkaHelper::PrintMeta(topic_name, metadata);

      delete metadata;
      run = 0;
    }
  }

  static void PrintMeta(string const & topic, const RdKafka::Metadata *metadata) {
    std::cout << "Metadata for " << (topic.empty() ? "" : "all topics")
              << "(orig broker id from broker "  << metadata->orig_broker_id()
              << ":" << metadata->orig_broker_name() << std::endl;

    /* Iterate brokers */
    std::cout << " " << metadata->brokers()->size() << " brokers:" << std::endl;
    RdKafka::Metadata::BrokerMetadataIterator ib;
    for (ib = metadata->brokers()->begin(); ib != metadata->brokers()->end(); ++ib) {
      std::cout << "  broker " << (*ib)->id() << " at " 
                << *(*ib)->host() << ":" << (*ib)->port() << std::endl;
    }
    /* Iterate topics */        
    std::cout << metadata->topics()->size() << " topics:" << std::endl;
    RdKafka::Metadata::TopicMetadataIterator it;
    for (it = metadata->topics()->begin(); it != metadata->topics()->end(); ++it) {
      std::cout << "  topic "<< *(*it)->topic() << " with " 
                << (*it)->partitions()->size() << " partitions" << std::endl;

      if ((*it)->err() != RdKafka::ERR_NO_ERROR) {
        std::cout << " " << err2str((*it)->err());
        if ((*it)->err() == RdKafka::ERR_LEADER_NOT_AVAILABLE) {
          std::cout << " (try again)";
        }       
      }
      std::cout << std::endl;

      /* Iterate topic‘s partitions */
      RdKafka::TopicMetadata::PartitionMetadataIterator ip;
      for (ip = (*it)->partitions()->begin(); ip != (*it)->partitions()->end(); ++ip) {
        std::cout << "    partition " << (*ip)->id()
                  << " leader " << (*ip)->leader()
                  << ", replicas: ";

        /* Iterate partition‘s replicas */
        RdKafka::PartitionMetadata::ReplicasIterator ir;
        for (ir = (*ip)->replicas()->begin(); 
             ir != (*ip)->replicas()->end() ; 
             ++ir) {
          std::cout << (ir == (*ip)->replicas()->begin() ? ",":"") << *ir;
        }

        /* Iterate partition‘s ISRs */
        std::cout << ", isrs: ";
        RdKafka::PartitionMetadata::ISRSIterator iis;
        for (iis = (*ip)->isrs()->begin(); iis != (*ip)->isrs()->end() ; ++iis)
          std::cout << (iis == (*ip)->isrs()->begin() ? ",":"") << *iis;

        if ((*ip)->err() != RdKafka::ERR_NO_ERROR)
          std::cout << ", " << RdKafka::err2str((*ip)->err()) << std::endl;
        else
          std::cout << std::endl;
      }
    }
  }


 private:
  static void watcher(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx) {
    char brokers[1024];
    if (type == ZOO_CHILD_EVENT && strncmp(path, BROKER_PATH, sizeof(BROKER_PATH) - 1) == 0)
      {
        brokers[0] = ‘‘;
        set_brokerlist_from_zookeeper(zh, brokers);
        if (brokers[0] != ‘‘ && rk != NULL)
          {
            rd_kafka_brokers_add(rk, brokers);
            rd_kafka_poll(rk, 10);
          }
      }
  }


  static zhandle_t* initialize_zookeeper(string const& zookeeper) {
    zhandle_t * zh = zookeeper_init(zookeeper.c_str(), watcher, 10000, 0, 0, 0);
    if (zh == NULL) {
      fprintf(stderr, "Zookeeper connection not established.");
      exit(1);
    }
    return zh;
  }

  static void set_brokerlist_from_zookeeper(zhandle_t *zzh, char *brokers) {
    if (zzh) {
      struct String_vector brokerlist;
      if (zoo_get_children(zzh, BROKER_PATH, 1, &brokerlist) != ZOK) {
        fprintf(stderr, "No brokers found on path %s
", BROKER_PATH);
        return;
      }

      int i;
      char *brokerptr = brokers;
      for (i = 0; i < brokerlist.count; i++) {
        char path[255], cfg[1024];
        sprintf(path, "/brokers/ids/%s", brokerlist.data[i]);
        int len = sizeof(cfg);
        zoo_get(zzh, path, 0, cfg, &len, NULL);

        if (len > 0) {
          cfg[len] = ‘‘;
          json_error_t jerror;
          json_t *jobj = json_loads(cfg, 0, &jerror);
          if (jobj) {
            json_t *jhost = json_object_get(jobj, "host");
            json_t *jport = json_object_get(jobj, "port");

            if (jhost && jport) {
              const char *host = json_string_value(jhost);
              const int   port = json_integer_value(jport);
              sprintf(brokerptr, "%s:%d", host, port);

              brokerptr += strlen(brokerptr);
              if (i < brokerlist.count - 1) {
                *brokerptr++ = ‘,‘;
              }
            }
            json_decref(jobj);
          }
        }
      }
      deallocate_String_vector(&brokerlist);
      printf("Found brokers %s
", brokers);
    }
  }
};

#endif

3.2 main.cc完整代码

这里包含了读取数据的代码

#include <iostream>
#include <string>
#include <cstdlib>
#include <cstdio>
#include <csignal>
#include <cstring>
#include <getopt.h>
#include <list>
#include "helper/kafka_helper.h"

using std::string;
using std::list;
using std::cout;
using std::endl;

static bool run = true;
static bool exit_eof = true;

class MyEventCb : public RdKafka::EventCb {
public:
  void event_cb (RdKafka::Event &event) {
    switch (event.type())
      {
      case RdKafka::Event::EVENT_ERROR:
        std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<
          event.str() << std::endl;
        if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN)
          run = false;
        break;

      case RdKafka::Event::EVENT_STATS:
        std::cerr << ""STATS": " << event.str() << std::endl;
        break;

      case RdKafka::Event::EVENT_LOG:
        fprintf(stderr, "LOG-%i-%s: %s
",
                event.severity(), event.fac().c_str(), event.str().c_str());
        break;

      default:
        std::cerr << "EVENT " << event.type() <<
          " (" << RdKafka::err2str(event.err()) << "): " <<
          event.str() << std::endl;
        break;
      }
  }
};


void msg_consume(RdKafka::Message* message, void* opaque) {
  switch (message->err()) {
  case RdKafka::ERR__TIMED_OUT:
    break;

  case RdKafka::ERR_NO_ERROR:
    /* Real message */
    std::cout << "Read msg at offset " << message->offset() << std::endl;
    if (message->key()) {
      std::cout << "Key: " << *message->key() << std::endl;
    }
    cout << static_cast<const char *>(message->payload()) << endl;
    break;

  case RdKafka::ERR__PARTITION_EOF:
    cout << "reach last message" << endl;
    /* Last message */
    if (exit_eof) {
      run = false;
    }
    break;

  case RdKafka::ERR__UNKNOWN_TOPIC:
  case RdKafka::ERR__UNKNOWN_PARTITION:
    std::cerr << "Consume failed: " << message->errstr() << std::endl;
    run = false;
    break;

  default:
    /* Errors */
    std::cerr << "Consume failed: " << message->errstr() << std::endl;
    run = false;
  }
}


class MyConsumeCb : public RdKafka::ConsumeCb {
public:
  void consume_cb (RdKafka::Message &msg, void *opaque) {
    msg_consume(&msg, opaque);
  }
};

static void sigterm (int sig) {
  run = false;
}



int main (int argc, char **argv) {
  /*
   * Process kill signal, quit from the loop
   */
  signal(SIGINT, sigterm);
  signal(SIGTERM, sigterm);

  /*
   * Get broker list from zookeeper
   */
  string zookeeper("localhost:2181");
  string brokers = KafkaHelper::Brokers(zookeeper);
  cout << "brokers from zookeeper is: " << brokers << endl;

  string topic_name = "test2";
  /*
   * Print topic meta
   */
  KafkaHelper::PrintTopicMeta(topic_name);

  /*
   * Global conf objects
   */
  RdKafka::Conf * global_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
  string errstr;
  global_conf->set("metadata.broker.list", brokers, errstr);
  MyEventCb ex_event_cb;
  global_conf->set("event_cb", &ex_event_cb, errstr);

  /*
   * Topic conf objects
   */
  RdKafka::Conf * topic_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);

  /*
   * Create consumer using accumulated global configuration.
   */
  RdKafka::Consumer *consumer = RdKafka::Consumer::create(global_conf, errstr);
  if (!consumer) {
    std::cerr << "Failed to create consumer: " << errstr << std::endl;
    exit(1);
  }

  std::cout << "% Created consumer " << consumer->name() << std::endl;

  /*
   * Start consumer for topic+partition at start offset
   */
  int32_t partition = 0;
  int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;

  RdKafka::Topic *topic2 = RdKafka::Topic::create(consumer, topic_name, topic_conf, errstr);
  RdKafka::ErrorCode resp = consumer->start(topic2, 0, start_offset);
  if (resp != RdKafka::ERR_NO_ERROR) {
    std::cerr << "Failed to start consumer: " <<
      RdKafka::err2str(resp) << std::endl;
    exit(1);
  }


  /*
   * Consume messages
   */
  MyConsumeCb ex_consume_cb;
  int use_ccb = 0;
  while (run) {
    if (use_ccb) {
      consumer->consume_callback(topic2, partition, 1000, &ex_consume_cb, &use_ccb);
    } else {
      RdKafka::Message *msg = consumer->consume(topic2, partition, 1000);
      msg_consume(msg, NULL);
      delete msg;
    }
    consumer->poll(0);
  }  

  /*
   * Stop consumer
   */
  consumer->stop(topic2, partition);
  consumer->poll(1000);

  delete topic2;
  delete consumer;

  /*
   * Wait for RdKafka to decommission.
   * This is not strictly needed (when check outq_len() above), but
   * allows RdKafka to clean up all its resources before the application
   * exits so that memory profilers such as valgrind wont complain about
   * memory leaks.
   */
  RdKafka::wait_destroyed(5000);

  return 0;
}

Created: 2016-05-02 Mon 13:07

Validate

再分享一下我老师大神的人工智能教程吧。零基础!通俗易懂!风趣幽默!还带黄段子!希望你也加入到我们人工智能的队伍中来!https://blog.csdn.net/jiangjunshow

以上是关于读取zookeeper保存的topic元数据的主要内容,如果未能解决你的问题,请参考以下文章

详解Apache Pulsar的Topic绑定Broker

从文件中读取元组列表/将数据保存到文件

zookeeper+kafka,使用Java实现消息对接读取

Kafka Shell基本命令(包括topic的增删改查)

kafka删除topic

kafka生产存储消费消息