失败时优雅地重启 Reactive-Kafka Consumer Stream

Posted

技术标签:

【中文标题】失败时优雅地重启 Reactive-Kafka Consumer Stream【英文标题】:Gracefully restart a Reactive-Kafka Consumer Stream on failure 【发布时间】:2018-11-01 02:54:10 【问题描述】:

问题 当我重新启动/完成/停止流时,旧的消费者不会死/关机:

[INFO ] a.a.RepointableActorRef -
  Message [akka.kafka.KafkaConsumerActor$Internal$Stop$] 
  from Actor[akka://ufo-sightings/deadLetters]
  to Actor[akka://ufo-sightings/system/kafka-consumer-1#1896610594]
  was not delivered. [1] dead letters encountered.

说明 我正在构建一个从 Kafka 主题接收消息并通过 HTTP 请求将消息发送到外部服务的服务。

    与外部服务的连接可能断开,我的服务需要重试请求。

    此外,如果 Stream 中出现错误,则需要重新启动整个 Stream。

    最后,有时我不需要流及其对应的 Kafka-consumer,我想关闭整个流

所以我有一个流:

Consumer.committableSource(customizedSettings, subscriptions)
  .flatMapConcat(sourceFunction)
  .toMat(Sink.ignore)
  .run

http请求在sourceFunction发送

我遵循了新文档中的新 Kafka Consumer Restart 说明

  RestartSource.withBackoff(
      minBackoff = 20.seconds,
      maxBackoff = 5.minutes,
      randomFactor = 0.2 )  () =>
          Consumer.committableSource(customizedSettings, subscriptions)
            .watchTermination() 
                case (consumerControl, streamComplete) =>
                  logger.info(s" Started Watching Kafka consumer id = $consumer.id termination: is shutdown: $consumerControl.isShutdown, is f completed: $streamComplete.isCompleted")
                  consumerControl.isShutdown.map(_ => logger.info(s"Shutdown of consumer finally happened id = $consumer.id at $DateTime.now"))
                  streamComplete
                    .flatMap  _ =>
                      consumerControl.shutdown().map(_ -> logger.info(s"3.consumer id = $consumer.id SHUTDOWN at $DateTime.now GRACEFULLY:CLOSED FROM UPSTREAM"))
                    
                    .recoverWith 
                      case _ =>
                        consumerControl.shutdown().map(_ -> logger.info(s"3.consumer id = $consumer.id SHUTDOWN at $DateTime.now ERROR:CLOSED FROM UPSTREAM"))
                    
             
            .flatMapConcat(sourceFunction)
      
      .viaMat(KillSwitches.single)(Keep.right)
      .toMat(Sink.ignore)(Keep.left)
      .run

打开了一个issue,在一个复杂的 Akka 流中讨论了这个非终止消费者,但还没有解决方案。

是否有强制终止 Kafka Consumer 的解决方法

【问题讨论】:

【参考方案1】:

如何将消费者包装在一个 Actor 中并注册一个 KillSwitch,请参阅:https://doc.akka.io/docs/akka/2.5/stream/stream-dynamic.html#dynamic-stream-handling

然后在 Actor postStop 方法中,您可以终止流。 通过将 Actor 包装在 BackoffSupervisor 中,您可以获得指数退避。

示例演员:https://github.com/tradecloud/kafka-akka-extension/blob/master/src/main/scala/nl/tradecloud/kafka/KafkaSubscriberActor.scala#L27

【讨论】:

KillSwitch 也不起作用。上面的示例仅适用于简单的消费者流,如果 Stream 包含嵌套流,它将无法杀死 kafka ConsumerActor。但无论如何我都会试试这个

以上是关于失败时优雅地重启 Reactive-Kafka Consumer Stream的主要内容,如果未能解决你的问题,请参考以下文章

未定义变量时如何优雅地使管道失败

如何在不延迟任务的情况下优雅地重启 Celery

如何优雅地重启 django 在 nginx 后面运行 fcgi?

Spring Cache with Redis - 如果与 Redis 的连接失败,如何优雅地处理甚至跳过缓存

优雅重启 django-q qcluster

当按下 Ctrl+C 时,优雅地中止从 Windows Python Paramiko 脚本通过 SSH 执行的远程 Windows 命令