Kafka - 使用高级消费者的延迟队列实现

Posted

技术标签:

【中文标题】Kafka - 使用高级消费者的延迟队列实现【英文标题】:Kafka - Delayed Queue implementation using high level consumer 【发布时间】:2015-10-24 20:02:03 【问题描述】:

想要使用高级消费者 api 实现延迟消费者

主要思想:

按键生成消息(每个消息都包含创建时间戳)这确保每个分区都按生成时间排序消息。 auto.commit.enable=false(将在每个消息处理后显式提交) 消费一条消息 检查消息时间戳并检查是否已经过了足够的时间 处理消息(此操作永远不会失败)

提交 1 个偏移量

while (it.hasNext()) 
  val msg = it.next().message()
  //checks timestamp in msg to see delay period exceeded
  while (!delayedPeriodPassed(msg))  
     waitSomeTime() //Thread.sleep or something....
  
  //certain that the msg was delayed and can now be handled
  Try  process(msg)  //the msg process will never fail the consumer
  consumer.commitOffsets //commit each msg

对此实现的一些担忧:

    提交每个偏移量可能会减慢 ZK 速度 consumer.commitOffsets 可以抛出异常吗?如果是,我将使用相同的消息两次(可以用幂等消息解决) 等待很长时间而不提交偏移量的问题,例如延迟时间为 24 小时,将从迭代器获取下一个,休眠 24 小时,处理并提交(ZK 会话超时?) ZK 会话如何在不提交新偏移量的情况下保持活动状态? (设置一个 hive zookeeper.session.timeout.ms 可以解决死消费者而不识别它) 还有其他问题我遗漏了吗?

谢谢!

【问题讨论】:

1.从 0.8.2 you can commit offsets to kafka 开始(尽管 zk 仍然被广泛使用) 2. 是的,这是根本问题(注意只处理一次) 3. 你的 zk 会话将过期(如果你有很多组消息中的消费者可能会从原始消费者重新平衡)。坦率地说,如果您每天有 1 条消息,kafka 听起来不太合适 我有很多消息(比如说 ~10k rpm),但在某些情况下,我想延迟消息消耗(例如,在某些消息处理失败后有延迟重试机制)。如果发生再平衡,这仍然有效,新的消费者将延迟消息 在这种情况下,您最终会在许多消费者身上安排消息:A 消费消息 1,安排它在 24 小时内运行,提交偏移量和会话已过期. B 启动,使用相同的消息,安排它在 24 小时内运行,......最终,这将像病毒一样传播。如果你提交消息,它可能会在消费者错误的情况下丢失,你可以选择你喜欢的任何一个(我个人会选择后一个,它简化了语义)。是否可以选择忙着等待? 我没有安排在 24 小时内运行。我检查提交的时间(消息的一部分)并检查当前时间,看看是否已经过了 24 小时。这样它就不会像病毒一样“传播”并被消耗掉。如何设置会话不过期? zookeeper.session.timeout.ms参数,默认设置为6秒,但设置为极值听起来像是在滥用技术(zk将无法跟踪哪些消费者实际上因此而死亡)。 【参考方案1】:

解决此问题的一种方法是使用不同的主题来推送所有要延迟的消息。如果所有延迟的消息都应该在相同的时间延迟之后处理,这将是相当简单的:

while(it.hasNext()) 
    val message = it.next().message()
    
    if(shouldBeDelayed(message)) 
        val delay = 24 hours
        val delayTo = getCurrentTime() + delay
        putMessageOnDelayedQueue(message, delay, delayTo)
    
    else 
       process(message)
    

    consumer.commitOffset()

现在将尽快处理所有常规消息,而将需要延迟的消息放在另一个主题上。

好消息是我们知道延迟主题头部的消息是应该首先处理的消息,因为它的 delayTo 值将是最小的。因此,我们可以设置另一个读取头消息的消费者,检查时间戳是否在过去,如果是,则处理消息并提交偏移量。如果不是,它不会提交偏移量,而是会一直休眠到那个时间:

while(it.hasNext()) 
    val delayedMessage = it.peek().message()
    if(delayedMessage.delayTo < getCurrentTime()) 
        val readMessage = it.next().message
        process(readMessage.originalMessage)
        consumer.commitOffset()
     else 
        delayProcessingUntil(delayedMessage.delayTo)
    

如果有不同的延迟时间,您可以按延迟划分主题(例如 24 小时、12 小时、6 小时)。如果延迟时间比那个更动态,它会变得更复杂一些。您可以通过引入两个延迟主题来解决它。从延迟主题A 中读取所有消息,并处理所有delayTo 值为过去的消息。在其他人中,您只需找到最接近delayTo 的人,然后将它们放在主题B 上。休眠直到最接近的消息应该被处理并反向执行,即处理来自主题 B 的消息,并将不应该处理的一次放回主题 A

回答您的具体问题(一些已在您的问题的 cmets 中解决)

    提交每个偏移量可能会减慢 ZK 速度

您可以考虑切换到将偏移量存储在 Kafka 中(从 0.8.2 开始提供的功能,请查看使用者配置中的 offsets.storage 属性)

    consumer.commitOffsets 可以抛出异常吗?如果是,我将使用相同的消息两次(可以用幂等消息解决)

我相信它可以,例如,如果它不能与偏移存储进行通信。正如你所说,使用幂等消息可以解决这个问题。

    等待很长时间而不提交偏移量的问题,例如延迟时间为 24 小时,将从迭代器获取下一个,休眠 24 小时,处理并提交(ZK 会话超时?)

除非处理消息本身的时间超过会话超时,否则上述解决方案不会出现问题。

    ZK 会话如何在不提交新偏移量的情况下保持活动状态? (设置一个 hive zookeeper.session.timeout.ms 可以解决死消费者而不识别它)

同样,您不需要设置长时间的会话超时。

    还有其他我遗漏的问题吗?

总会有;)

【讨论】:

感谢您的详细回答。为什么使用 it.peek().message() 而不是 it.next() ? ConsumerIterator.peek() 继承自 IteratorTemplate 不会更改 ConsumerIterator 中的任何内容。在调用 ConsumerIterator.next() 方法之前,它将始终为您提供相同的值。比较:github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/… 和 github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/…。简而言之,它不会将迭代器向前移动。 我找不到 ConsumerIterator 类。它是否仍然存在于最新版本的 kafka 中? @alexlipa 该特定类似乎已被删除。我可以看到它的最新版本是 0.7,目前版本最高为 2.0,中间有很多更新。 让侦听器线程进入睡眠状态直到延迟结束并不是一个好主意。您可以快速耗尽所有侦听器线程。【参考方案2】:

在您的情况下,我会建议另一条路线。

在消费者的主线程中解决等待时间是没有意义的。这将是如何使用队列的反模式。从概念上讲,您需要尽可能快地处理消息并将队列保持在低负载率。

相反,我会使用调度程序来为您需要延迟的每条消息安排作业。通过这种方式,您可以处理队列并创建将在预定义时间点触发的异步作业。

使用这种技术的缺点是它对将计划作业保存在内存中的 JVM 的状态是敏感的。如果该 JVM 失败,您将失去计划的作业,并且您不知道该任务是否已执行。

有一些调度程序实现,但可以配置为在集群环境中运行,从而确保您免受 JVM 崩溃的影响。

看看这个java调度框架:http://www.quartz-scheduler.org/

【讨论】:

“安排工作”很难做到……这增加了复杂性,但最终会奏效。我正在寻找简单的东西 使用 Tibco EMS 或其他 JMS 队列。他们内置了重试延迟。 Kafka 可能不是您正在做的事情的正确设计选择。 @Nimrod007,我同意。【参考方案3】:

使用 Tibco EMS 或其他 JMS 队列。他们内置了重试延迟。 Kafka 可能不是您正在做的事情的正确设计选择

【讨论】:

这是正确的答案。我很惊讶有多少人认为 Kafka 是一个通用消息队列。【参考方案4】:

按计划键控列表或其 redis 替代方案可能是最佳方法。

【讨论】:

【参考方案5】:

我们在其中一项任务中遇到了同样的问题。虽然最终在不使用延迟队列的情况下解决了这个问题,但在探索解决方案时,我们发现最好的方法是使用KafkaConsumer API 提供的pauseresume 功能。这种方法及其动机在这里得到了完美的描述:https://medium.com/naukri-engineering/retry-mechanism-and-delay-queues-in-apache-kafka-528a6524f722

【讨论】:

以上是关于Kafka - 使用高级消费者的延迟队列实现的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMq高级特性之延迟队列 通俗易懂 超详细 内含案例

加米谷:Kafka OffsetMonitor:监控消费者和延迟的队列

延时队列常用实现详解

golang实现本地延迟队列

RabbitMQ如何实现延迟队列?

记一次kafka消费延迟造成的生产问题