使用 Kafka 作为 EventStore 时在 Flink 中恢复状态一致性

Posted

技术标签:

【中文标题】使用 Kafka 作为 EventStore 时在 Flink 中恢复状态一致性【英文标题】:Recovering state consistency in Flink when using Kafka as EventStore 【发布时间】:2017-02-07 10:07:07 【问题描述】:

问题

我正在将微服务实现为事件源聚合,而该聚合又被实现为 Flink FlatMapFunction。在基本设置中,聚合从两个 kafka 主题中读取事件和命令。然后,它将新事件写入第一个主题并在第三个主题中处理结果。因此,Kafka 充当事件存储。希望这张图有帮助:

  RPC Request                              RPC Result
  |                                                 |
  ~~~~> Commands-|              |---> Results ~~~~~~|
                 |-->Aggregate--|
  ~> Input evs. -|              |---> output evs. ~~~
  |                                                 |
  ~~~~~<~~~~~~~~~~~<~~~feedbak loop~~~~~<~~~~~~~~<~~~

由于 Kafka 没有检查点,因此命令可能会被重播两次,并且输出事件似乎也可以被写入主题的两次。

在那些重复消息的情况下如何恢复状态?聚合是否可以知道其输入流何时是最新的以开始处理命令?

我的想法

我想了几个解决方案:

    如果 Flink 实现了回滚未确认事件,则可以实现一个 Sink,它将从事件源获取当前偏移量。重新启动时,此接收器将删除 kafka 主题中的新偏移事件。按照他的方式,KafkaSource 和 KafkaSink 将从同一个构建器生成,然后暴露给拓扑。鉴于其他服务可能读取主题中较新的事件并导致不一致,此解决方案存在一个严重问题。

    如果在 2 中无法从 Flink 中删除事件,则有状态源可能会从偏移量中读取事件,并尝试匹配聚合中的重复事件并删除它们。此选项似乎不可靠,因为在某些情况下补丁不是确定性的并且容易受到缺陷的影响,因为应该针对每个聚合和拓扑重新考虑它,并且它不能保证恢复(例如,在连续重启的情况下)。因此,这是一个糟糕的解决方案。

    这是一种不同的方法。就是创建一个特殊的KafkaSource,有两个特殊的watermark:第一个,KafkaSourceStartedWatermark,在source启动时会一直发送,通知依赖的operator。当发送此水印时,源内部会记录当前的 Kafka 偏移量。第二个,KafkaSourceUpToDateWatermark,在达到偏移量时由源发送。这些水印将透明地沿着拓扑传播。操作员应该能够处理这些水印,实现一个特殊的 WatermarkNotifiable 接口。然后,聚合将能够缓冲或丢弃 RPC 命令,直到它在每个输入源中都是最新的。

    interface WatermarkNotifiable  
        void started(String watermarkId);//KafkaSourceStartedWatermark watermark
        void upToDate(String watermarkId);//KafkaSOurceUpToDateWatermark watermark
      
    
    1234563那么。

    其他不同的方法是不处理比条件更早的命令。例如,命令具有入口时间戳。如果使用时间,则时间同步至关重要。

*** 相关问题

    Using Kafka as a (CQRS) Eventstore. Good idea? Kafka - Know if Consumer is up to date Kafka & Flink duplicate messages on restart

【问题讨论】:

【参考方案1】:

创建一个新的 Conmuter 运算符类型。这就像一个来源。它由几个代表事件和命令主题的源组成。它以“恢复”状态开始。在这种状态下,它会从事件主题中读取最新的内容。同时,对于命令,它存储或删除它们。一旦更新,它会认为已恢复并“打开”命令的方式。它可以单独实现为一个源加一个操作符。

FlinkKafkaProducerXX 不足以做到这一点,但它会是实现它的基础。

【讨论】:

以上是关于使用 Kafka 作为 EventStore 时在 Flink 中恢复状态一致性的主要内容,如果未能解决你的问题,请参考以下文章

从 eventStore 获取特定数量的事件

使用EventStore,我可以创建一个新的iCal日历类型吗?

使用 JOliver EventStore 更新多个聚合

应用地图功能时在同一类上获取Kafka Streams Class Cast异常

Prooph Eventstore (PDO) 和 Doctrine DBAL 导致多个连接

分享一个CQRS/ES架构中基于写文件的EventStore的设计思路