spring-kafka 固定事件迁移实现
Posted
技术标签:
【中文标题】spring-kafka 固定事件迁移实现【英文标题】:spring-kafka fixtured event migration implementation 【发布时间】:2022-01-20 06:15:34 【问题描述】:我有几项服务,其中一项是事实来源 (SOT)。 Kafka 是他们的消息代理。有时我需要生成一组将在其他服务中使用和应用的事件。所谓的固定事件迁移。
我的夹具文件示例:
EntityUpdated (topicA)
- id
- relation
RelationUpdated (topicB)
- id
- relation
并且类是应用事件后在数据库中具有投影的弹簧实体。
class Entity: Model
id
val relation: Relation
class Relation: Model
id
当前的消费者实现以任意方式读取主题,消费者可以在 topicA 之前从 topicB 读取数据,并且我遇到由于相关实体尚不存在而无法应用消息的情况。 (在 EntityUpdated 之前使用 RelationUpdated)。
我有几个想法来解决它:
暂停所有分区/主题并按指定顺序恢复。所以我可以避免案例RelationUpdated consumed before EntityUpdated
。然后在恢复所有主题的所有分区后,我可以继续以任意方式工作。我不喜欢切换,但它看起来很有效。
将无法应用的消息放入所谓的死信队列,并尝试一次又一次地重播,直到它们都被应用。
也许有人会做类似的事情。我很高兴知道您的想法。
【问题讨论】:
【参考方案1】:这不是 Apache Kafka 的设计目的。更清楚地说,消息传递是关于独立性和关注点分离的。我的意思是,一个来源中的消息不应该影响其他消息,而您希望通过暂停和 DLT 做什么对其余的主题非常有影响。
我建议你看看 Spring Integration 及其聚合器模式实现:https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#aggregator。这个想法是:您的Entity
和Relation
与id
有相关性。因此,独立于谁先到达,聚合器将等待第二部分,只有在那之后才会释放它们以进行下一个处理步骤。有了这个集成解决方案,就不需要消费者暂停等任何事情,也不需要通过 DLT 进行额外的代理轮次。
如果 Spring Integration 对您来说太复杂,您可以考虑使用本地 Map
实现自己的解决方案,以添加基于第一次到达 (computeIfAbsent()
) 并返回第二次到达 (computeIfPresent()
) .
【讨论】:
谢谢。聚合器和弹簧集成对我来说很有意义以上是关于spring-kafka 固定事件迁移实现的主要内容,如果未能解决你的问题,请参考以下文章