如何暂停卡夫卡消费者?
Posted
技术标签:
【中文标题】如何暂停卡夫卡消费者?【英文标题】:How to pause a kafka consumer? 【发布时间】:2018-03-08 21:31:55 【问题描述】:我在我的框架中使用 Kafka 生产者 - 消费者模型。在消费者端消费的记录稍后会被索引到 elasticsearch 上。在这里我有一个用例,如果 ES 关闭,我将不得不暂停 kafka 消费者,直到 ES 启动,一旦启动,我需要恢复消费者并使用我上次离开的记录。 我认为@KafkaListener 无法做到这一点。谁能给我一个解决方案?我发现我需要为此编写自己的 KafkaListenerContainer,但我无法正确实现它。任何帮助将不胜感激。
【问题讨论】:
如果您在记录被索引到 ES 后在消费者端提交偏移量,那么您不必担心“从我上次离开的地方消费记录”(您的第二个问题)。当 ES 关闭时,您将无法索引,您不会提交偏移量,因此 kakfka 将再次重试您将收到相同的消息。 【参考方案1】:有几种可能的解决方案,一种简单的方法是使用 KafkaConsumer API。在 KafkaConsumer 中,实现跟踪主题的位置,下次调用 poll(...) 时将检索该位置。您的问题是从 Kafka 获取记录后,您可能无法将其插入 Elastic Search。在这种情况下,您必须编写一个例程来重置消费者的位置,在您的情况下将是 consumer.seek(partition, consumer.position(partition)-1)。这会将位置重置为较早的位置。此时,一个好的方法是暂停分区(这将使服务器能够进行一些资源清理),然后轮询 ES(通过您想要的任何机制)。一旦 ES 可用,就调用消费者的 resume 并继续您通常的轮询插入周期。
讨论后编辑
使用指定的生命周期方法创建一个 spring bean。在 bean 的初始化方法中实例化您的 KafkaConsumer(从任何来源检索消费者的配置)。从方法开始一个线程与消费者交互并更新 ES,其余的设计如上。这是一个单线程模型。为了获得更高的吞吐量,请考虑将从 Kafka 检索到的数据保存在内存队列中的小型内存队列中,并使用调度程序线程来获取消息并将其提供给池化线程以更新 ES。
【讨论】:
我了解如何做到这一点的理论,想到了一些与您的答案非常接近的东西,我的问题在于实施。能否请您发布一个相同的代码 sn-p? 你可能不能用注解来做到这一点,如果你使用KafkaConsumer没问题,实现很简单,所有列出的方法都可用。您在实施中遇到的问题 我的 Consumer 是一个使用 @KafkaListener 注解的 POJO 监听器。我有一个用于此消费者配置的 ConsumerConfig 类。我的监听方法如下:@KafkaListener(topicPattern = KafkaConsumerConfig.TEST_TOPIC_ID, containerFactory = "kafkaListenerContainerFactory") public void process(ConsumerRecord, ?> record) logger.info("record: "+record);我不知道从哪里获取“消费者”对象以应用搜索和其他方法。 据我了解,您使用 Spring 的开发模型给您带来了问题,因为您没有直接使用 KafkaConsumer 的灵活性。您正在部署哪个容器。 是的,那么我该如何在 SPRING 中实现呢?【参考方案2】:我建议宁愿暂停消费者,为什么你不能一次又一次地重试同一条消息,并在消息被成功消费后提交偏移量。
例如:
用@Retryable
注释你的方法
并使用 try/catch 阻止您的方法,并在 catch 块中抛出新异常。
对于 ListenerFactory 配置添加属性:
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setAckOnError(false);
【讨论】:
【参考方案3】:有几种方法可以实现这一目标。
方法#1
在 Thread 中创建您的 KafkaConsumer
对象并运行无限 while
循环以使用事件。
完成此设置后,您可以中断线程并在while
循环中检查Thread.interrupt()
是否为true
。如果是,则跳出循环并关闭消费者。
完成恢复活动后,使用相同的组 ID 重新创建使用者。请注意,这可能会重新平衡消费者。
如果您使用 python,可以使用线程 stop_event
来实现同样的事情。
方法#2
使用 KafkaConumer API pause(partitions_list)
函数。它接受 Kafka 分区作为输入。因此,提取分配给消费者的所有部分并将这些部分传递给pause(partitions_list)
函数。消费者将停止从这些分区中提取数据。
一定时间后,可以使用resume(partitions_list)
函数恢复消费者。此方法不会重新平衡消费者。
注意:如果您使用的是 Spring Kafka 客户端。这变得容易多了。您可以启动/停止消息侦听器容器。
你可以找到详细的解释here。
【讨论】:
以上是关于如何暂停卡夫卡消费者?的主要内容,如果未能解决你的问题,请参考以下文章