当主题有多个分区时,KTable-KTable 外键连接不会产生所有消息

Posted

技术标签:

【中文标题】当主题有多个分区时,KTable-KTable 外键连接不会产生所有消息【英文标题】:KTable-KTable foreign-key join not producing all messages when topics have more than one partition 【发布时间】:2020-11-03 03:26:21 【问题描述】:

请参阅下面的更新以显示潜在的解决方法

我们的应用程序使用 2 个主题作为 KTables,执行左连接,并输出到一个主题。在测试过程中,我们发现当我们的输出主题只有 1 个分区时,这可以正常工作。当我们增加分区数量时,我们注意到生成到输出主题的消息数量减少了。

在启动应用程序之前,我们使用多个分区配置测试了这一理论。使用 1 个分区,我们可以看到 100% 的消息。使用 2,我们看到一些消息(少于 50%)。有 10 个,我们几乎看不到任何(不到 10%)。

因为我们要加入,所以从主题 1 消费的每条消息都应该写入我们的输出主题,但我们发现这并没有发生。似乎消息卡在从 Ktable 的外键连接创建的“中间”主题中,但没有错误消息。

任何帮助将不胜感激!

Service.java

@Bean
public BiFunction<KTable<MyKey, MyValue>, KTable<MyOtherKey, MyOtherValue>, KStream<MyKey, MyEnrichedValue>> process() 

    return (topicOne, topicTwo) ->
            topicOne
                    .leftJoin(topicTwo,
                            value -> MyOtherKey.newBuilder()
                                    .setFieldA(value.getFieldA())
                                    .setFieldB(value.getFieldB())
                                    .build(),
                            this::enrich)
                    .toStream();

build.gradle

plugins 
    id 'org.springframework.boot' version '2.3.1.RELEASE'
    id 'io.spring.dependency-management' version '1.0.9.RELEASE'
    id 'com.commercehub.gradle.plugin.avro' version '0.9.1'


...

ext 
    set('springCloudVersion', "Hoxton.SR6")


...

implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka-streams'
implementation 'io.confluent:kafka-streams-avro-serde:5.5.1'

注意:由于 spring-cloud-stream 中包含的版本中存在错误,我们排除了 org.apache.kafka 依赖项

application.yml

spring:
  application:
    name: app-name
    stream:
      bindings:
        process-in-0:
          destination: topic1
          group: $spring.application.name
        process-in-1:
          destination: topic2
          group: $spring.application.name
        process-out-0:
          destination: outputTopic
      kafka:
        streams:
          binder:
            applicationId: $spring.application.name
            brokers: $KAFKA_BROKERS
            configuration:
              commit.interval.ms: 1000
              producer:
                acks: all
                retries: 20
              default:
                key:
                  serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
                value:
                  serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
            min-partition-count: 2

测试场景:

举个具体的例子,如果我将以下 3 条消息发布到主题 1:

"fieldA": 1, "fieldB": 1,,"fieldA": 1, "fieldB": 1
"fieldA": 2, "fieldB": 2,,"fieldA": 2, "fieldB": 2
"fieldA": 3, "fieldB": 3,,"fieldA": 3, "fieldB": 3
"fieldA": 4, "fieldB": 4,,"fieldA": 4, "fieldB": 4

输出主题只会收到2条消息。

"fieldA": 2, "fieldB": 2,,"fieldA": 2, "fieldB": 2
"fieldA": 3, "fieldB": 3,,"fieldA": 3, "fieldB": 3

另外两个怎么了?似乎某些键/值对无法写入输出主题。重试这些“丢失”的消息也不起作用。

更新:

通过将主题 1 用作 KStream 而不是 KTable 并在继续执行 KTable-KTable 连接之前调用 toTable(),我能够正常运行。我仍然不确定为什么我的原始解决方案不起作用,但希望这种解决方法可以对实际问题有所了解。

@Bean
public BiFunction<KStream<MyKey, MyValue>, KTable<MyOtherKey, MyOtherValue>, KStream<MyKey, MyEnrichedValue>> process() 

    return (topicOne, topicTwo) ->
            topicOne
                    .map(...)
                    .toTable()
                    .leftJoin(topicTwo,
                            value -> MyOtherKey.newBuilder()
                                    .setFieldA(value.getFieldA())
                                    .setFieldB(value.getFieldB())
                                    .build(),
                            this::enrich)
                    .toStream();

【问题讨论】:

奇怪的是,使用KSteam#toTable() 会改变任何东西。你可以分享两个程序的拓扑描述来比较它们吗?可以提供一些启示。 @MatthiasJ.Sax 事实证明,KStream#map()KStream#toTable() 的组合是使用多个分区时的诀窍。重申一下,这在 1 个分区上可以正常工作,但是当我们尝试多个分区时,它只有在我们作为 KStream 消费然后通过映射键/值强制它重新分区时才有效。 【参考方案1】:

鉴于问题的描述,似乎(左)KTable 输入主题中的数据未按其键正确分区。对于单个分区的主题,好吧,只有一个分区,所有数据都到这个分区,连接结果就完成了。

但是,对于一个多分区的输入主题,你需要确保数据是按key分区的,否则,具有相同key的两条记录可能最终在不同的分区中,从而连接失败(因为连接完成以每个分区为基础)。

请注意,即使外键连接不要求两个输入主题共同分区,但仍然要求每个输入主题本身都按其键进行分区!

如果您使用map().toTable(),您基本上会触发数据的内部重新分区,以确保数据按键分区,从而解决问题。

【讨论】:

我正在和@Mario P 一起做这个项目。我相信你是对的,这是一个分区问题;我只是不确定左侧主题中的数据将如何被键错误地分区。例如,如果我使用的是 confluent CLI 控制台生产者,我是否需要做任何事情来确保它正确地按键分区?似乎消息被均匀地放在分区之间的左侧主题中。我们能够关注这个tutorial,但是当我们将密钥类型从原始更改为 avro 时,它不再起作用。 从我的脑海中,我会假设控制台生产者确实按键分区 - 但是,Avro 消息(与 Confluent SR 一起)有一个“标题”,它编码模式 ID 和这个标头可能会“弄乱”分区。 -- 你在map() 步骤中具体做了什么? 我们这样做:map(KeyValue::new)。所以我们实际上并没有改变键或值,只是重新实例化。 如果您为活页夹使用自定义分区器,您是否也将这些自定义分区器传递给 Kafka Streams? -- 您需要重新配置内部 Producer 以使用相同的分区器,否则,内部外键连接订阅/响应主题将使用不同的分区 -- 或确保您在活页夹中使用 DefaultPartitioner 您绝对正确,我们的问题与密钥的分区有关。我们稍微改变了架构,所以现在我们加入了来自生产者的 KTables,它们都使用 kafka-streams-binder。看来我们之前的架构是有问题的,因为 kafka-binder 和 kafka-streams-binder 的默认分区策略必须略有不同。我将选择您的帖子作为答案,因为它提供了对我们问题根本原因的一些见解。谢谢@MatthiasJ.Sax【参考方案2】:

选择加入主题的键可能会有所帮助。主题的分区配置应该相同。

return (topicOne, topicTwo) ->
        topicOne
            .leftJoin(topicTwo,
                value -> MyOtherKey.newBuilder()
                    .setFieldA(value.getFieldA())
                    .setFieldB(value.getFieldB())
                    .build(),
                this::enrich)
            .toStream().selectKey((key, value) -> key);

【讨论】:

感谢您的评论。我按照建议尝试了 selectKey ,但这并没有改变行为。无论出于何种原因,某些消息无论如何都会被丢弃。为了清楚起见,我将在我的原始帖子中添加一些测试场景。 您实现的两个主题的分区大小是否相同?此外,您的行为是否会因您是输出到应用程序还是控制台消费者而有所不同? 是的,每个主题都有相同数量的分区。唯一能正常工作的数字是 1,而且每增加一个分区似乎都会增加删除记录的机会。无论我们使用控制中心浏览主题、从应用程序输出还是使用任何其他方式,行为都不会改变。【参考方案3】:

这是一个奇怪的问题,我从未听说过多个输出主题分区控制数据写入频率。但是我知道toStream() 仅在缓存已满时才会将数据写入下游,因此请尝试设置cache.max.bytes.buffering = 0。 此外,KTable 仅保留每个键的最新记录,因此如果您对同一个键有多个值,则只有最新值会保留并写入下游。

【讨论】:

我试了一下,但没有成功。这绝对是一个奇怪的问题,所以我在帖子中添加了一个测试场景来澄清发生了什么。感谢您的评论。

以上是关于当主题有多个分区时,KTable-KTable 外键连接不会产生所有消息的主要内容,如果未能解决你的问题,请参考以下文章

Kafka数据消费

KafkaKafka日志管理

Amazon Kinesis Streams - 每个分片有多个“主题”?

多个分区的 kafka 流式传输行为

Kafka 分区机制详解

2、kafka如何选定分区数量