如何根据其他记录的处理结果在Kafka中处理记录?

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何根据其他记录的处理结果在Kafka中处理记录?相关的知识,希望对你有一定的参考价值。

我有一个@KafkaListener类,可侦听特定主题,并使用包含Person对象或Phone对象(且其中只有一个)的记录。每个Phone都有对应的Person的参考/相关ID。侦听器类执行特定于接收到的类型的某些验证,将该对象保存到数据库中,并产生成功/失败的回传到Kafka的响应,并由另一项服务使用。]

因此Person可以成功传输而没有任何相应的Phone,但是Phone传输应该仅在相应的Person传输成功后才成功。我无法全神贯注于如何实现这种“同步”,因为PersonPhone作为单独的记录独立进入Kafka,并且不能保证与特定Person对应的Phone将在Phone之前进行处理。

给定当前的体系结构,是否有可能进行这样的同步,或者我应该重新设计生产者并发送Person / Phone对作为单独的类型?

谢谢。

答案

尚不清楚您如何针对不同的对象类型使用相同的序列化程序,但您可能应该创建单独的主题和/或将当前主题分为两部分(请参阅Kafka Streams API)

我假设人员少于电话,在这种情况下,您可以根据人员主题构建KTable,然后在获取电话记录时,可以针对该人员ID在此表上进行左联接或查找

[其他解决方案可能涉及使用Kafka Connect将记录转储到可以进行联接的系统中

以上是关于如何根据其他记录的处理结果在Kafka中处理记录?的主要内容,如果未能解决你的问题,请参考以下文章

Kafka Streams 是不是适合触发记录的批处理?

如何仅打印kafka记录值,而不是所有其他数据?

如何使用kafka流处理块/批处理数据?

处理大并发量订单处理的 KafKa部署总结

Java处理excel根据某列的值查询,并将结果显示在其他列中

MySql根据字段名查询重复记录并删除!只保留一条