卡夫卡消费者:受控阅读主题

Posted

技术标签:

【中文标题】卡夫卡消费者:受控阅读主题【英文标题】:Kafka Consumer : controlled reading from topic 【发布时间】:2017-03-01 10:14:50 【问题描述】:

我有以下 kafka 消费者代码,其中 3 个线程正在从具有 3 个分区的 kafka 主题中读取。

有什么办法,只有在线程当前正在处理的消息被处理后,才会从kafka主题中读取新消息。

例如,假设主题中有 100 条消息,那么有什么方法可以一次读取并处理 3 条消息。现在,当这 3 条消息被处理后,则只应读取接下来的 3 条消息,依此类推。

public void run(int a_numThreads) 
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(a_numThreads));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

// now launch all the threads
//
executor = Executors.newFixedThreadPool(3);

// now create an object to consume the messages
//
int threadNumber = 0;
for (final KafkaStream stream : streams) 
    executor.submit(new ConsumerTest(stream, threadNumber));
    threadNumber++;
   

【问题讨论】:

【参考方案1】:

如果 ConsumerTest 内部的迭代器正在同步处理消息,那么一次只会消耗 3 条消息。 enable.auto.commit 默认为 true。确保不要将其设置为 false,否则需要添加提交偏移量的逻辑。

前-

 ConsumerIterator<byte[], byte[]> streamIterator= stream.iterator(); 
 while (streamIterator.hasNext())  
   String kafkaMsg= new String(streamIterator.next().message()); 
  

【讨论】:

我已经设置了 properties.put("auto.commit.interval.ms", "1000");像这样。如果这是正确的? 是的,这是正确的。需要注意的一点是,这将每 1 秒提交一次偏移量。如果您的服务崩溃,那么在这 1 秒间隔内处理的所有消息都不会被提交,并且会在您的消费者再次启动时重新处理(设计服务时要考虑的边缘情况)。因此,您需要确保您的消费者逻辑对服务崩溃和重复消息处理具有容错能力。【参考方案2】:

好吧,默认情况下,消费者彼此不了解,因此无法“同步”他们的工作。您可以做的是将您的三条消息包装成一条消息(从而保证它们都将按顺序得到答复)或者可能引入更多(“子”)主题。

另一种可能性(如果您确实需要保证您的三个消息将被单个消费者消费)可能是您的所有消费者同步他们的工作,或者可能通知一个跟踪您的工作的控制器。

但是感觉就像你“做错了”,实际上队列中的消息是无状态的,只有它们在主题中的顺序决定了它们的“处理顺序”。处理消息的时间无关紧要。

【讨论】:

嗨 Nikolas,我不关心消息的排序。我想要的是只应从主题中读取 N 条消息,然后当这些 N 条消息的所有处理结束时,只应获取下一条 N 条消息。 啊,也许在那种情况下 seek(TopicPartition partition, long offset) 可以解决问题?但是您必须意识到,如果您“在错误的时间”请求偏移量,您可能会丢失数据。但是您可以获取所有数据,只读取您的 N 条消息并丢弃其余消息。然后只需使用新的偏移量 + N 再次触发您的队列? 这是一个非常好的方法。到目前为止,它对我来说很好用 另外,消费者必须在本地存储每个分区的最后消费偏移量。否则,它可能会丢失上次消耗偏移量的标记。

以上是关于卡夫卡消费者:受控阅读主题的主要内容,如果未能解决你的问题,请参考以下文章

消费者。如何指定要读取的分区? [卡夫卡]

卡夫卡消费者名单

卡夫卡消费者不返回任何事件

卡夫卡动物园管理员的目的

卡夫卡消费者不是从最新消息开始

卡夫卡消费者配置