kafka客户端API问题

Posted

技术标签:

【中文标题】kafka客户端API问题【英文标题】:kafka Client Api questions 【发布时间】:2017-05-13 17:07:07 【问题描述】:

谁能帮我解决以下问题。我正在使用 kafka-clients-0.10.1.1(单节点单代理)

auto.create.topics.enable 的默认值为 true。

1.我正在使用

向主题发送消息
    kafkaProdcuer<String,String> producer> producer...
    producer.send(new ProducerRecord<String, String>("my- topic","message"));
    producer.close();

消费:

    kafkaConsumer<String,String> consumer....
    consumer.subscribe(Arrays.asList("my-topic"));
    ConsumerRecords<String, String> records = consumer.poll(200);

    while(true)
     for (ConsumerRecord<String, String> record : records) 
            System.out.println(record.value());
         
     

问题是当我第一次运行消费者时,它没有得到值。而且我必须运行生产者并再次运行消费者以获取值。有时我必须运行生产者 3 次。 为什么会这样?

2.) enable.auto.commit=false

如果 enable.auto.commit 属性为 false,同一消费者能否多次阅读消息?

3.) 考虑我在第一点的消费者代码。我如何打破循环我的意思是消费者如何知道它已读取所有消息然后调用 consumer.close()

【问题讨论】:

kafka bin里有一个console-consumer,你自己的consumer不能消费数据的时候可以试试。如果可能,请尝试添加 producer.flush() 。对于您的问题 3,流式处理程序无法知道批处理的结束,但您可以设置超时线程来监控超时而不消耗数据。 是的,我用 bin 消费者对其进行了测试,它在获取相关 id 为 1 的元数据时出现错误:my-topic-106=LEADER_NOT_AVAILABLE (org.apache.kafka.clients.网络客户端) 您最近是否在消费数据之前生产过数据?默认情况下,Kafka 只会将您的数据存储 3 天。 【参考方案1】:

1) 你总是在消费者中使用相同的 group.id 吗?你是在消费前生产吗?这可能与消费者群体和偏移管理有关。请参阅this answer about consumer offset behavior。

2) 不确定您的意思是有意还是无意地阅读重复内容。只要该消息由于主题保留策略而未被删除,您始终可以再次阅读相同的消息以寻找该位置。如果你的意思是意外,自动提交设置为 false 只是意味着消费者不会为你提交偏移量,你必须手动调用 commitSync() 或 commitAsync()。在任何情况下,您的消费者仍有可能在提交之前处理消息并崩溃,在这种情况下,当消费者恢复时,它将再次读取那些已处理但未提交的消息。如果您只想要一次语义,则必须执行其他操作,例如使用已处理的消息以原子方式存储偏移量。

3) 正如Lhfcws 提到的,在流中没有像“所有消息”这样的概念。您可以做的一些事情(技巧)是:

您可以检查 poll 返回的记录列表是否为空,并且在配置的次数后中断循环并退出。 如果消息是有序的(您正在从单个分区读取),您可以发送一种特殊的 END_OF_DATA 消息,当您看到它时,您会关闭消费者。 您可以让消费者读取一些消息然后退出,下次它会从上次提交的偏移量继续。

【讨论】:

感谢 Lhfcws 和 Luciano。我现在很清楚第 2 点和第 3 点。关于第 1 点,我在生产者之后立即运行消费者。我没有更改消费者组。我没有使用创建主题bin 实用程序。我假设代码 producer.send 将创建主题。 bootstrap.servers=localhost:9092 group.id=test enable.auto.commit=true jena84,尝试在消费者配置中将 auto.offset.reset 设置为“最早”,然后重试。此外,启动消费者后等待重新平衡完成。 太棒了!!!它起作用了。非常感谢。是不是因为您提供的关于偏移管理的链接中提到的原因。我也尝试将“最小”放在它不允许我。是因为新的消费者 API 吗? 很高兴它成功了。是的,新的消费者 API 使用“最早的”,而旧的消费者使用“最小的”。如果您使用的是 0.10 客户端,请注意使用“新消费者配置”(kafka.apache.org/documentation/#newconsumerconfigs)

以上是关于kafka客户端API问题的主要内容,如果未能解决你的问题,请参考以下文章

Kafka核心API——AdminClient API

Kafka核心API——AdminClient API

kafka----kafka API(java版本)

Kafka 连接或 Kafka 客户端

Kafka消息队列大数据实战教程-第四篇(Kafka客户端Producer API)

Kafka消息队列大数据实战教程-第四篇(Kafka客户端Producer API)