如何根据其他记录的处理结果在Kafka中处理记录?
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何根据其他记录的处理结果在Kafka中处理记录?相关的知识,希望对你有一定的参考价值。
我有一个@KafkaListener
类,可侦听特定主题,并使用包含Person
对象或Phone
对象(且其中只有一个)的记录。每个Phone
都有对应的Person
的参考/相关ID。侦听器类执行特定于接收到的类型的某些验证,将该对象保存到数据库中,并产生成功/失败的回传到Kafka的响应,并由另一项服务使用。]
因此Person
可以成功传输而没有任何相应的Phone
,但是Phone
传输应该仅在相应的Person
传输成功后才成功。我无法全神贯注于如何实现这种“同步”,因为Person
和Phone
作为单独的记录独立进入Kafka,并且不能保证与特定Person
对应的Phone
将在Phone
之前进行处理。
给定当前的体系结构,是否有可能进行这样的同步,或者我应该重新设计生产者并发送Person
/ Phone
对作为单独的类型?
谢谢。
答案
尚不清楚您如何针对不同的对象类型使用相同的序列化程序,但您可能应该创建单独的主题和/或将当前主题分为两部分(请参阅Kafka Streams API)
我假设人员少于电话,在这种情况下,您可以根据人员主题构建KTable,然后在获取电话记录时,可以针对该人员ID在此表上进行左联接或查找
[其他解决方案可能涉及使用Kafka Connect将记录转储到可以进行联接的系统中
以上是关于如何根据其他记录的处理结果在Kafka中处理记录?的主要内容,如果未能解决你的问题,请参考以下文章