Kafka 流式添加全局存储的用例

Posted

技术标签:

【中文标题】Kafka 流式添加全局存储的用例【英文标题】:Kafka streams use cases for add global store 【发布时间】:2020-03-20 15:13:44 【问题描述】:

在 kafka 流中定义拓扑时,可以添加全局状态存储。它需要一个源主题以及一个ProcessorSupplier。 处理器接收记录并可以在将它们添加到存储之前从理论上对其进行转换。但是在恢复的情况下,记录会直接从源主题(更改日志)插入到全局状态存储中,跳过处理器中完成的最终转换。

   +-------------+             +-------------+              +---------------+
   |             |             |             |              |    global     |
   |source topic  ------------->  processor  +-------------->    state      |
   |(changelog)  |             |             |              |    store      |
   +-------------+             +-------------+              +---------------+
          |                                                         ^
          |                                                         |
          +---------------------------------------------------------+
              record directly inserted during restoration

StreamsBuilder#addGlobalStore(StoreBuilder storeBuilder, String topic, Consumed consumed, ProcessorSupplier stateUpdateSupplier) 将全局 StateStore 添加到拓扑。

根据文档

注意:您不应使用处理器将转换后的记录插入到全局状态存储中。该存储使用源主题作为更改日志,并且在恢复期间将插入直接来自源的记录。这个 ProcessorNode 应该用来保持 StateStore 是最新的。

与此同时,主要错误目前在 kafka 错误跟踪器上打开:KAFKA-7663 Custom Processor supplied on addGlobalStore is not used when restoring state from topic 准确解释了文档中的说明,但似乎是一个公认的错误。

我想知道 KAFKA-7663 是否确实是一个错误。根据文档,它似乎是这样设计的,在这种情况下,我很难理解用例。 有人能解释一下这个低级 API 的主要用例吗?我唯一能想到的就是处理副作用,例如在处理器中执行一些日志操作。

额外问题:如果源主题作为全局存储的更新日志,当一条记录因为保留期已过期而从主题中删除时,它会从全局状态存储中删除吗?还是只有在从更改日志中完全恢复商店后才会在商店中进行删除。

【问题讨论】:

请注意,旧文档没有指出问题,我们只是将文档更新为“中间修​​复”。 【参考方案1】:

是的,这是一个很奇怪的小命令 22,但文档是正确的。全局状态存储的处理器不得对记录执行任何操作,而是将它们保存到存储中。

AFAIK,这不是一个哲学问题,而是一个实际问题。原因仅仅是您观察到的行为...... Streams 将输入主题视为存储的更改日志主题,因此在恢复期间绕过处理器(以及反序列化)。

状态恢复绕过任何处理的原因是通常更改日志中的数据与存储中的数据相同,因此对它执行任何新操作实际上都是错误的。另外,将字节从线路中取出并将它们批量写入状态存储会更有效。我说“通常”是因为在这种情况下,输入主题与普通的变更日志主题不完全一样,因为它在存储放置期间不会接收到它的写入。

对于它的价值,我也很难理解用例。看起来,我们应该:

    完全摆脱该处理器,始终将二进制数据从网络中转储到存储中,就像恢复一样。 重新设计全局存储以允许在全局存储之前进行任意转换。我们可以: 继续使用输入主题并在恢复期间反序列化和调用处理器,或者 为全局存储添加 real 更改日志,这样我们就可以轮询输入主题,应用一些转换,然后写入全局存储全局存储-更改日志。然后,我们可以使用更改日志(而不是输入)进行恢复和复制。

顺便说一句,如果您想要后一种行为,您现在可以通过应用您的转换来近似它,然后使用to(my-global-changelog) 来创建一个“更改日志”主题。然后,您将创建全局存储以从您的 my-global-changelog 读取而不是输入。

所以,给你一个直接的答案,KAFKA-7663 不是一个错误。我将对提议将其转换为功能请求的票发表评论。

奖励答案:充当状态存储更改日志的主题不得配置保留。实际上,这意味着您应该通过启用压缩来防止无限增长,并禁用日志保留。

在实践中,旧数据失去保留并被丢弃并不是“事件”,消费者无法知道它是否/何时发生。因此,无法从状态存储中删除数据以响应此非事件。正如您所描述的那样,它会发生……这些记录将无限期地存放在全球商店中。如果/当一个实例被替换时,新的实例将从输入中恢复,并且(显然)只接收当时存在于主题中的记录。因此,作为一个整体的 Streams 集群最终会以不一致的全局状态视图结束。这就是您应该禁用保留的原因。

从存储中“删除”旧数据的正确方法是将所需键的墓碑写入输入主题。然后,这将正确传播到集群的所有成员,在恢复期间正确应用,并由代理正确压缩。

我希望这一切都会有所帮助。当然,请加入票务并帮助我们使 API 更直观!

【讨论】:

是的,它确实有很大帮助。感谢您的详细回答:) 澄清“作为状态存储的更改日志的主题不得配置保留。”:这意味着您不应将主题配置为在经过一定时间或在一定时间后过期数据已超过大小阈值。相反,数据应该“永远”保留在主题中,启用压缩有助于确保主题仍然不会超出界限。 我正在寻找解释。非常感谢【参考方案2】:

目前似乎没有办法监听 KGlobalTable 的变化。

您可以使用全局存储和自定义处理器获得类似的结果。

我在这里偶然发现了这个How to be notified about updates to state store of GlobalKTable?

我并不是说这是一个好的用例,但作为一种解决方法,它可能会有所帮助。

【讨论】:

以上是关于Kafka 流式添加全局存储的用例的主要内容,如果未能解决你的问题,请参考以下文章

Linux文件系统的用例建模

pytest 用例编写规则命令行执行用例用例执行的先后顺序

HttpRunner 跳过用例录制生成用例用例分层机制

「事件驱动架构」Kafka vs. RabbitMQ:架构性能和用例

XSL 流在小文档上的用例,而不是提前退出?

设计模式 用例图之二