使用 Kafka 的 Streams API 处理错误消息

Posted

技术标签:

【中文标题】使用 Kafka 的 Streams API 处理错误消息【英文标题】:Handling bad messages using Kafka's Streams API 【发布时间】:2017-07-28 18:01:03 【问题描述】:

我有一个基本的流处理流程,看起来像

master topic -> my processing in a mapper/filter -> output topics

我想知道处理“坏消息”的最佳方法。这可能是我无法正确反序列化的消息之类的事情,或者处理/过滤逻辑可能以某种意想不到的方式失败(我没有外部依赖项,因此不应该出现此类暂时性错误)。

我正在考虑将我的所有处理/过滤代码包装在 try catch 中,如果引发异常,则路由到“错误主题”。然后我可以研究该消息并根据需要修改或修复我的代码,然后将其重播以掌握。如果我让任何异常传播,则流似乎会被阻塞,并且不会接收到更多消息。

这种方法是否被认为是最佳做法? 是否有方便的 Kafka 流方式来处理这个问题?我认为没有 DLQ 的概念... 有哪些替代方法可以阻止 Kafka 干扰“坏消息”? 有哪些替代错误处理方法?

为了完整起见,这里是我的代码(伪 ish):

class Document 
    // Fields


class AnalysedDocument 

    Document document;
    String rawValue;
    Exception exception;
    Analysis analysis;

    // All being well
    AnalysedDocument(Document document, Analysis analysis) ...

    // Analysis failed
    AnalysedDocument(Document document, Exception exception) ...

    // Deserialisation failed
    AnalysedDocument(String rawValue, Exception exception) ...


KStreamBuilder builder = new KStreamBuilder();
KStream<String, AnalysedPolecatDocument> analysedDocumentStream = builder
    .stream(Serdes.String(), Serdes.String(), "master")
    .mapValues(new ValueMapper<String, AnalysedDocument>() 
         @Override
         public AnalysedDocument apply(String rawValue) 
             Document document;
             try 
                 // Deserialise
                 document = ...
              catch (Exception e) 
                 return new AnalysedDocument(rawValue, exception);
             
             try 
                 // Perform analysis
                 Analysis analysis = ...
                 return new AnalysedDocument(document, analysis);
              catch (Exception e) 
                 return new AnalysedDocument(document, exception);
             
         
    );

// Branch based on whether analysis mapping failed to produce errorStream and successStream
errorStream.to(Serdes.String(), customPojoSerde(), "error");
successStream.to(Serdes.String(), customPojoSerde(), "analysed");

KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();

非常感谢任何帮助。

【问题讨论】:

1> quarantine topic 方法似乎有风险,因为糟糕的生产者可能会导致高开销,特别是如果该主题的多个消费者一直忙于将相同的格式错误的消息推送到该隔离主题 2> @987654324 @ 方法听起来更直观,并且可以使用 KStream doubled = input.flatMap( .. 验证 kv 的反序列化来最小化潜在的重新分区开销,并且具有必须反序列化的缺点(这次安全)再次密钥;因为密钥的(反序列化)成本远低于值的成本 【参考方案1】:

如果您想将异常(自定义异常)发送到另一个主题(ERROR_TOPIC_NAME):

@Bean
  public KStream<String, ?> kafkaStreamInput(StreamsBuilder kStreamBuilder) 
    KStream<String, InputModel> input = kStreamBuilder.stream(INPUT_TOPIC_NAME);
    return service.messageHandler(input);
  

public KStream<String, ?> messageHandler(KStream<String, InputModel> inputTopic) 
    KStream<String, Object> output;
    output = inputTopic.mapValues(v -> 
      try 
        //return InputModel
        return normalMethod(v);
       catch (Exception e) 
        //return ErrorModel
        return errorHandler(e);
      
    );
  
    output.filter((k, v) -> (v instanceof ErrorModel)).to(KafkaStreamsConfig.ERROR_TOPIC_NAME);
    output.filter((k, v) -> (v instanceof InputModel)).to(KafkaStreamsConfig.OUTPUT_TOPIC_NAME);

    return output;
  

如果你想处理 Kafka 异常并跳过它:

@Autowired
  public ConsumerErrorHandler(
      KafkaProducer<String, ErrorModel> dlqProducer) 
    this.dlqProducer = dlqProducer;
  

  @Bean
  ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
      ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
      ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory) 
    ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    configurer.configure(factory, kafkaConsumerFactory.getIfAvailable());
    factory.setErrorHandler(((exception, data) -> 

      ErrorModel errorModel = ErrorModel.builder().message()
          .status("500").build();
      assert data != null;
        dlqProducer.send(new ProducerRecord<>(DLQ_TOPIC, data.key().toString(), errorModel));
    ));
    return factory;
  

【讨论】:

您的答案可以通过额外的支持信息得到改进。请edit 添加更多详细信息,例如引用或文档,以便其他人可以确认您的答案是正确的。你可以找到更多关于如何写好答案的信息in the help center。【参考方案2】:

对于 处理 逻辑,您可以采用这种方法:

someKStream 

    .mapValues(inputValue -> 
        // for each execution the below "return" could provide a different class than the previous run!
        // e.g. "return isFailedProcessing ? failValue : successValue;" 
        // where failValue and successValue have no related classes
        return someObject; // someObject class vary at runtime depending on your business
    ) // here you'll have KStream<whateverKeyClass, Object> -> yes, Object for the value!

    // you could have a different logic for choosing  
    // the target topic, below is just an example
    .to((k, v, recordContext) -> v instanceof failValueClass ?
            "dead-letter-topic" : "success-topic",
            // you could completelly ignore the "Produced" part 
            // and rely on spring-boot properties only, e.g. 
            // spring.kafka.streams.properties.default.key.serde=yourKeySerde
            // spring.kafka.streams.properties.default.value.serde=org.springframework.kafka.support.serializer.JsonSerde
            Produced.with(yourKeySerde, 
                            // JsonSerde could be an instance configured as you need 
                            // (with type mappings or headers setting disabled, etc)
                            new JsonSerde<>())); 

您的课程虽然不同并且涉及不同的主题,但将按预期进行序列化。

当不使用to(),而是想继续其他处理时,可以使用branch(),根据kafka-value类进行逻辑拆分; branch() 的诀窍是返回 KStream&lt;keyClass, ?&gt;[],以便进一步允许将单个数组项转换为适当的类。

【讨论】:

【参考方案3】:

我认为这些示例在使用 Avro 时根本不起作用。

当架构无法解析时(例如,有损坏主题的错误/非 avro 消息),首先没有要反序列化的 keyvalue,因为到那时 DSL .branch() 代码被调用,异常已经被抛出(或处理)。

谁能确认我是否确实如此?使用 Avro 时,您在此处提到的非常流畅的方法是不可能的?

KIP-161 确实解释了如何使用处理程序,但是,将其视为拓扑的一部分会更加流畅。

【讨论】:

【参考方案4】:

2018 年 3 月 23 日更新: Kafka 1.0 通过KIP-161 提供了比我在下面描述的更好、更容易处理错误消息(“毒丸”)的方法.请参阅 Kafka 1.0 文档中的 default.deserialization.exception.handler。

这可能是我无法正确反序列化的消息之类的内容 [...]

好的,我的回答集中在(反)序列化问题上,因为对于大多数用户来说,这可能是最棘手的情况。

[...] 或者处理/过滤逻辑可能以某种意想不到的方式失败(我没有外部依赖项,因此不应该出现此类暂时性错误)。

同样的想法(对于反序列化)也可以应用于处理逻辑中的故障。在这里,大多数人倾向于下面的选项 2(减去反序列化部分),但 YMMV。

我正在考虑将我的所有处理/过滤代码包装在 try catch 中,如果引发异常,则路由到“错误主题”。然后我可以研究该消息并根据需要修改或修复我的代码,然后将其重播以掌握。如果我让任何异常传播,则流似乎会被阻塞,并且不会接收到更多消息。

这种方法是否被认为是最佳做法?

是的,目前这是要走的路。本质上,两种最常见的模式是 (1) 跳过损坏的消息或 (2) 将损坏的记录发送到隔离主题,即死信队列。

是否有方便的 Kafka 流方式来处理这个问题?我认为没有 DLQ 的概念...

是的,有一种方法可以解决这个问题,包括使用死信队列。但是,它(至少恕我直言)还没有那么方便。如果您对 API 应如何处理此问题有任何反馈 - 例如通过新的或更新的方法、配置设置(“如果序列化/反序列化失败,将有问题的记录发送到此隔离主题”)——请告诉我们。 :-)

有哪些替代方法可以阻止 Kafka 干扰“坏消息”? 有哪些替代错误处理方法?

请参阅下面的示例。

FWIW,Kafka 社区也在讨论添加一个新的 CLI 工具,允许您跳过损坏的消息。但是,作为 Kafka Streams API 的用户,我认为理想情况下您希望直接在代码中处理此类场景,并且仅作为最后的手段回退到 CLI 实用程序。

以下是 Kafka Streams DSL 处理损坏的记录/消息(即“毒丸”)的一些模式。这取自http://docs.confluent.io/current/streams/faq.html#handling-corrupted-records-and-deserialization-errors-poison-pill-messages

选项 1:使用 flatMap 跳过损坏的记录

这可以说是大多数用户想要做的。

我们使用flatMap,因为它允许您为每个输入记录输出零个、一个或多个输出记录。在记录损坏的情况下,我们不输出任何内容(零记录),从而忽略/跳过损坏的记录。 与此处列出的其他方法相比,此方法的优势:我们只需手动反序列化记录一次! 这种方法的缺点:flatMap“标记”输入流以进行潜在的数据重新分区,即如果您执行基于键的操作,例如分组 (groupBy/groupByKey) 或之后加入,您的数据将在后台重新分区。由于这可能是一个代价高昂的步骤,我们不希望这种情况不必要地发生。如果您知道记录键始终有效或者您不需要对键进行操作(因此将它们保留为byte[] 格式的“原始”键),您可以从flatMap 更改为flatMapValues,即使您稍后加入/分组/聚合流,也不会导致数据重新分区。

代码示例:

Serde<byte[]> bytesSerde = Serdes.ByteArray();
Serde<String> stringSerde = Serdes.String();
Serde<Long> longSerde = Serdes.Long();

// Input topic, which might contain corrupted messages
KStream<byte[], byte[]> input = builder.stream(bytesSerde, bytesSerde, inputTopic);

// Note how the returned stream is of type KStream<String, Long>,
// rather than KStream<byte[], byte[]>.
KStream<String, Long> doubled = input.flatMap(
    (k, v) -> 
      try 
        // Attempt deserialization
        String key = stringSerde.deserializer().deserialize(inputTopic, k);
        long value = longSerde.deserializer().deserialize(inputTopic, v);

        // Ok, the record is valid (not corrupted).  Let's take the
        // opportunity to also process the record in some way so that
        // we haven't paid the deserialization cost just for "poison pill"
        // checking.
        return Collections.singletonList(KeyValue.pair(key, 2 * value));
      
      catch (SerializationException e) 
        // log + ignore/skip the corrupted message
        System.err.println("Could not deserialize record: " + e.getMessage());
      
      return Collections.emptyList();
    
);

选项 2:branch 的死信队列

与选项 1(忽略损坏的记录)相比,选项 2 通过将损坏的消息从“主”输入流中过滤出来并将它们写入隔离主题(想想:死信队列)来保留损坏的消息。缺点是,对于有效记录,我们必须支付两次手动反序列化成本。

KStream<byte[], byte[]> input = ...;

KStream<byte[], byte[]>[] partitioned = input.branch(
    (k, v) -> 
      boolean isValidRecord = false;
      try 
        stringSerde.deserializer().deserialize(inputTopic, k);
        longSerde.deserializer().deserialize(inputTopic, v);
        isValidRecord = true;
      
      catch (SerializationException ignored) 
      return isValidRecord;
    ,
    (k, v) -> true
);

// partitioned[0] is the KStream<byte[], byte[]> that contains
// only valid records.  partitioned[1] contains only corrupted
// records and thus acts as a "dead letter queue".
KStream<String, Long> doubled = partitioned[0].map(
    (key, value) -> KeyValue.pair(
        // Must deserialize a second time unfortunately.
        stringSerde.deserializer().deserialize(inputTopic, key),
        2 * longSerde.deserializer().deserialize(inputTopic, value)));

// Don't forget to actually write the dead letter queue back to Kafka!
partitioned[1].to(Serdes.ByteArray(), Serdes.ByteArray(), "quarantine-topic");

选项 3:使用 filter 跳过损坏的记录

我只是为了完整性而提到这一点。此选项看起来像是选项 1 和 2 的混合,但比其中任何一个都差。与选项 1 相比,您必须为有效记录支付两次手动反序列化成本(糟糕!)。与选项 2 相比,您无法在死信队列中保留损坏的记录。

KStream<byte[], byte[]> validRecordsOnly = input.filter(
    (k, v) -> 
      boolean isValidRecord = false;
      try 
        bytesSerde.deserializer().deserialize(inputTopic, k);
        longSerde.deserializer().deserialize(inputTopic, v);
        isValidRecord = true;
      
      catch (SerializationException e) 
        // log + ignore/skip the corrupted message
        System.err.println("Could not deserialize record: " + e.getMessage());
      
      return isValidRecord;
    
);
KStream<String, Long> doubled = validRecordsOnly.map(
    (key, value) -> KeyValue.pair(
        // Must deserialize a second time unfortunately.
        stringSerde.deserializer().deserialize(inputTopic, key),
        2 * longSerde.deserializer().deserialize(inputTopic, value)));

非常感谢任何帮助。

我希望我能帮上忙。如果是,我会很感激您对我们如何改进 Kafka Streams API 以比现在更好/更方便的方式处理故障/异常的反馈。 :-)

【讨论】:

您好 Miguno,我认为一般来说让用户自己处理错误是有意义的,并且提供了似乎是库理念核心的灵活性。我猜一些示例或不同解决方案的食谱可能会很好。 请您解释一下您对选项 1 重新分区的评论?如果我将结果流(在您的示例中为“加倍”)写入不同的主题,这种重新分区开销是否仍然会发生? 仅当您执行基于键的操作(例如 groupByleftJoin)时才会发生重新分区。 明白。因此,如果只是将过滤后的流路由到其他地方,这不是问题。完美的。我想我会使用 1 和 2 的组合。继续努力。 对于选项 3,您不能通过使用映射到的信封对象来避免双重反序列化。信封对象将包含一个布尔值,指示消息的有效性,如果有效,则包含实际的反序列化消息本身。我认为这将提供一个很好的通用解决方案。【参考方案5】:

目前,Kafka Streams 仅提供有限的错误处理功能。正在进行工作以简化这一点。目前,您的整体方法似乎是一条不错的路。

关于处理反序列化/序列化错误的一条评论:手动处理这些错误,需要您“手动”进行反序列化/序列化。这意味着,您需要为您的 Streams 应用程序的输入/输出主题配置 ByteArraySerdes 的键和值,并添加一个执行反序列化/序列化的 map()(即,KStream&lt;byte[],byte[]&gt; -&gt; map() -&gt; KStream&lt;keyType,valueType&gt; - 或相反如果您还想捕获序列化异常)。否则不能try-catch反序列化异常。

使用您当前的方法,您“仅”验证给定字符串是否代表有效文档 - 但可能是消息本身已损坏并且无法在源运算符中转换为 String第一名。因此,您的代码实际上并没有涵盖反序列化异常。但是,如果您确定永远不会发生反序列化异常,那么您的方法也足够了。

更新

此问题已通过KIP-161 解决,并将包含在下一个版本 1.0.0 中。它允许您通过参数default.deserialization.exception.handler 注册回调。每次反序列化期间发生异常时都会调用该处理程序,并允许您返回 DeserializationResponseCONTINUE -> 删除记录并继续前进,或默认为 FAIL)。

更新 2

使用KIP-210(将成为 Kafka 1.1 的一部分)也可以在生产者端处理错误,类似于消费者部分,通过通过配置 default.production.exception.handler 注册一个 ProductionExceptionHandler 可以返回 CONTINUE .

【讨论】:

Matthias 一如既往的出色回答。很高兴知道我在正确的轨道上。我将进行建议的更改。我想我还应该包括一个“安全映射器”来访问字节数组(而不是在“to”语句中使用自定义 serde)。 KStream -> map() -> KStream -> KStream 我想指出 ProductionExceptionHandler 不处理业务逻辑异常,但只处理实际的 kafka 生产者面临的问题。

以上是关于使用 Kafka 的 Streams API 处理错误消息的主要内容,如果未能解决你的问题,请参考以下文章

Kafka 消费者 API 上的 Kafka Streams DSL

Kafka Streams API:KStream 到 KTable

[翻译和注解]Kafka Streams简介: 让流处理变得更简单

使用 Kafka Streams DSL 时如何处理错误和不提交

用于事件过滤的 Kafka Consumer API 与 Streams API

使用Kafka Streams处理复杂的Avro消息