spring-kafka 固定事件迁移实现

Posted

技术标签:

【中文标题】spring-kafka 固定事件迁移实现【英文标题】:spring-kafka fixtured event migration implementation 【发布时间】:2022-01-20 06:15:34 【问题描述】:

我有几项服务,其中一项是事实来源 (SOT)。 Kafka 是他们的消息代理。有时我需要生成一组将在其他服务中使用和应用的事件。所谓的固定事件迁移。

我的夹具文件示例:

EntityUpdated (topicA)
- id
- relation

RelationUpdated (topicB)
- id
- relation

并且类是应用事件后在数据库中具有投影的弹簧实体。

class Entity: Model 
  id
  val relation: Relation


class Relation: Model 
  id

当前的消费者实现以任意方式读取主题,消费者可以在 topicA 之前从 topicB 读取数据,并且我遇到由于相关实体尚不存在而无法应用消息的情况。 (在 EntityUpdated 之前使用 RelationUpdated)。

我有几个想法来解决它:

    暂停所有分区/主题并按指定顺序恢复。所以我可以避免案例RelationUpdated consumed before EntityUpdated。然后在恢复所有主题的所有分区后,我可以继续以任意方式工作。我不喜欢切换,但它看起来很有效。

    将无法应用的消息放入所谓的死信队列,并尝试一次又一次地重播,直到它们都被应用。

也许有人会做类似的事情。我很高兴知道您的想法。

【问题讨论】:

【参考方案1】:

这不是 Apache Kafka 的设计目的。更清楚地说,消息传递是关于独立性和关注点分离的。我的意思是,一个来源中的消息不应该影响其他消息,而您希望通过暂停和 DLT 做什么对其余的主题非常有影响。

我建议你看看 Spring Integration 及其聚合器模式实现:https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#aggregator。这个想法是:您的EntityRelationid 有相关性。因此,独立于谁先到达,聚合器将等待第二部分,只有在那之后才会释放它们以进行下一个处理步骤。有了这个集成解决方案,就不需要消费者暂停等任何事情,也不需要通过 DLT 进行额外的代理轮次。

如果 Spring Integration 对您来说太复杂,您可以考虑使用本地 Map 实现自己的解决方案,以添加基于第一次到达 (computeIfAbsent()) 并返回第二次到达 (computeIfPresent()) .

【讨论】:

谢谢。聚合器和弹簧集成对我来说很有意义

以上是关于spring-kafka 固定事件迁移实现的主要内容,如果未能解决你的问题,请参考以下文章

vue监听滚动事件 实现某元素吸顶或者固定位置显示

C#固定时间执行指定事件(观察者模式+异步委托)

Facebook 的“无时区事件”迁移

reactjs固定数据表中的无限滚动

将从 Fabric 创建的自定义事件迁移到 Firebase?

监听页面滚动事件