Kafka Streams KTable 外键连接未按预期工作

Posted

技术标签:

【中文标题】Kafka Streams KTable 外键连接未按预期工作【英文标题】:Kafka Streams KTable foreign key join not working as expected 【发布时间】:2021-12-03 22:02:15 【问题描述】:

我正在尝试在 Kafka Streams 中加入一个简单的外键连接,类似于许多文章(例如:https://www.confluent.io/blog/data-enrichment-with-kafka-streams-foreign-key-joins/)。

当我尝试将用户 id(用户表的主键)与 account_balance 表中的外键 user_id 连接以生成 AccountRecord 对象时,我收到以下错误: [-StreamThread-1] ignJoinSubscriptionSendProcessorSupplier : Skipping record due to null foreign key.

最终目标是在任一表中的任何字段更新时将AccountRecord 传递给主题。问题是当我简单地分别打印用户表和帐户表时,外键和所有字段都被完全填充。我看不出有什么问题或为什么会发生此错误。这是我的代码的 sn-p:

    public void start_test()
        StreamsBuilder builder = new StreamsBuilder();

        KTable<Long, User> userTable = builder.table(USER_TOPIC, Consumed.with(CustomSerdes.UserPKey(), CustomSerdes.User()));
        KTable<Long, AccountBalance> accountBalanceTable = builder.table(ACCOUNT_BALANCE_TOPIC, Consumed.with(CustomSerdes.UserPKey(), CustomSerdes.AccountBalance()));

        final KTable<Long, AccountRecord> accountRecordTable = accountBalanceTable.join(
                userTable,
                AccountBalance::getUserId,
                (account, user) -> new AccountRecord(user.getFirstName(), account.getBalance());
        );

        // print the table
        accountRecordTable
                .toStream()
                .print(Printed.toSysOut());

        KafkaStreams stream = new KafkaStreams(builder.build(), properties);
        stream.start();
    

任何指导都会有所帮助。我没有包含自定义 serde 代码或对象形状,但它们非常简单。如果您需要进一步说明,请告诉我。

谢谢

【问题讨论】:

【参考方案1】:

您的消息是否包含关键记录? KTable 是对 changelog 流的抽象,其中每条数据记录都代表一个更新,通过 key 知道更新的方法,对于当前使用 KTables 来说非常重要的是记录的 key。 例如

AccountBalance<Key=10,Value=accountBalanceId=10,userId=777,balance=10>
User<Key=777, Value=firstName="Panchito">

另一个观察是您的 Serde 键,如果您将 Long 定义为键,为什么要使用自定义 serde?​​p>

KTable<Long, User> userTable = builder.table(USER_TOPIC, Consumed.with(Serdes.Long(), CustomSerdes.User()));

KTable<Long, AccountBalance> accountBalanceTable = builder.table(ACCOUNT_BALANCE_TOPIC, Consumed.with(Serdes.Long(), CustomSerdes.AccountBalance()))

也许您的密钥反序列化程序将密钥作为空值发送。检查您的自定义 Serde 的输出登录输出。 此外,您还必须改进添加物化的 join 方法,因为您正在创建一个新对象,而 Kafka 不知道如何处理新对象。

      final KTable<Long, AccountRecord> accountRecordTable = accountBalanceTable.join(
                    userTable,
                    AccountBalance::getUserId,
                    (account, user) -> new AccountRecord(user.getFirstName(), account.getBalance()),
Materialized.with(Serdes.Long(), CustomSerdes.AccountBalanceSerde() )
            );

尝试使用 JsonSerde 或 Avro 来创建您的自定义 Serdes。

【讨论】:

感谢您的回复,我已将您的答案标记为正确。你说得对,问题出在 serde 上。实际上,问题在于帐户记录的 Json Serializer 自定义 serde。我必须将其配置为处理 snake_case,与反序列化器相同。

以上是关于Kafka Streams KTable 外键连接未按预期工作的主要内容,如果未能解决你的问题,请参考以下文章

Kafka Streams API:KStream 到 KTable

如何发送时间窗口 KTable 的最终 kafka-streams 聚合结果?

Kafka Streams API:避免在 KTable.mapValues 中添加额外的 stateStore

当主题有多个分区时,KTable-KTable 外键连接不会产生所有消息

KSQL KTabke+KTable Join重复结果异常。

Kafka Ktable 还流式传输重复更新