如何忽略从同一主题读取和写入不同事件类型的 Kafka Streams 应用程序中的某些类型的消息

Posted

技术标签:

【中文标题】如何忽略从同一主题读取和写入不同事件类型的 Kafka Streams 应用程序中的某些类型的消息【英文标题】:How to ignore some kinds of messages in a Kafka Streams Application that reads and writes different event types from the same topic 【发布时间】:2019-09-07 22:01:06 【问题描述】:

假设 Spring Cloud Stream 应用程序从 order topic 创建一个 KStream。它对OrderCreated "id":x, "productId": y, "customerId": z 事件感兴趣。一旦到达,它就会对其进行处理并生成一个输出事件OrderShipped "id":x, "productId": y, "customerName": <, "customerAddress": z 到相同的order topic

我面临的问题是,由于它从/向同一个主题读取和写入,Kafka Stream 应用程序正在尝试处理自己的写入,这没有意义。

如何阻止此应用程序处理它生成的事件?

更新:正如 Artem Bilan 和 sobychako 指出的那样,我曾考虑使用 KStream.filter(),但有一些细节让我怀疑如何处理这个问题:

现在 KStream 应用程序如下所示:

interface ShippingKStreamProcessor 
    ...
    @Input("order")
    fun order(): KStream<String, OrderCreated>

    @Output("output")
    fun output(): KStream<String, OrderShipped>

KStream 配置

    @StreamListener
    @SendTo("output")
    fun process(..., @Input("order") order: KStream<Int, OrderCreated>): KStream<Int, OrderShipped> 

订单和输出绑定都指向作为目的地的订单主题。

OrderCreated 类:

data class OrderCreated(var id: Int?, var productId: Int?, var customerId: Int?) 
    constructor() : this(null, null, null)

OrderShipped 类

data class OrderShipped(var id: Int?, var productId: Int?, var customerName: String?, var customerAddress: String?) 
    constructor() : this(null, null, null, null)

我使用 JSON 作为消息格式,因此消息如下所示:

输入 - 已创建订单:"id":1, "productId": 7,"customerId": 20 输出 - 已发货:"id":1, "productId": 7, "customerName": "X", "customerAddress": "Y"

考虑到这一点,我正在寻找过滤掉不需要的消息的最佳方法

如果我现在只使用KStream.filter(),当我得到"id":1, "productId": 7, "customerName": "X", "customerAddress": "Y" 时,我的KStream&lt;Int, OrderCreated&gt; 会将OrderShipped 事件解组为带有一些空字段的OrderCreated 对象:OrderCreated(id:1, productId: 7, customerId: null)。检查空字段听起来并不可靠。

可能的解决方案可以是向使用该主题的每种消息/类添加另一个字段 eventType = OrderCreated|OrderShipped。即使在这种情况下,我最终也会拥有一个带有 eventType=OrderShipped 属性的 OrderCreated 类(请记住 KStream)。 这看起来像一个丑陋的解决方法。有什么改进的办法吗?

还有另一种更自动的方法来处理这个问题吗?例如,如果消息不符合预期的模式 (OrderCreated),另一种序列化 (AVRO?) 会阻止消息被处理吗? 根据这篇文章,这种在同一主题中支持多个模式(事件类型)的方式似乎是一种很好的做法:https://www.confluent.io/blog/put-several-event-types-kafka-topic/ 但是不清楚如何解组/反序列化不同的类型。

【问题讨论】:

为什么KStream.filter() 不适合你?由于所有内容都在 Kafka 主题中,因此这些 OrderShipped 仍然可供该主题的其他消费者使用。 正如@ArtemBilan 提到的,这应该是可以由filter 控制的东西。如果您可以分享更多代码,我们可以看看。 我已经用更多细节更新了问题 【参考方案1】:

我已接受布鲁诺的回答作为解决此问题的有效方法。不过,我想我想出了一种更直接/合乎逻辑的方式,使用带有JsonTypeInfo 注释的事件层次结构。

首先,您需要一个 Order 事件的基类并指定所有子类。请注意,将有一个类型属性添加到 JSON 文档中,这将有助于 Jackson 编组/解组 DTO:

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type")
@JsonSubTypes(value = [
    JsonSubTypes.Type(value = OrderCreatedEvent::class, name = "orderCreated"),
    JsonSubTypes.Type(value = OrderShippedEvent::class, name = "orderShipped")
])
abstract class OrderEvent

data class OrderCreatedEvent(var id: Int?, var productId: Int?, var customerId: Int?) : OrderEvent() 
    constructor() : this(null, null, null)


data class OrderShippedEvent(var id: Int?, var productId: Int?, var customerName: String?, var customerAddress: String?) : OrderEvent () 
    constructor() : this(null, null, null, null)

这样,OrderCreatedEvent 对象的生产者将生成如下消息:

key: 1 value: "type":"orderCreated","id":1,"productId":24,"customerId":1

现在轮到 KStream 了。我已将签名更改为KStream&lt;Int, OrderEvent&gt;,因为它可以接收 OrderCreatedEvent 或 OrderShippedEvent。在接下来的两行中......

orderEvent.filter  _, value -> value is OrderCreatedEvent 
                .map  key, value -> KeyValue(key, value as OrderCreatedEvent) 

...我过滤以仅保留 OrderCreatedEvent 类的消息并将它们映射以将 KStream&lt;Int, OrderEvent&gt; 转换为 KStream&lt;Int, OrderCreatedEvent&gt;

完整的 KStream 逻辑:

@StreamListener
@SendTo("output")
fun process(@Input("input") input: KStream<Int, Customer>, @Input("order") orderEvent: KStream<Int, OrderEvent>): KStream<Int, OrderShippedEvent> 

        val intSerde = Serdes.IntegerSerde()
        val customerSerde = JsonSerde<Customer>(Customer::class.java)
        val orderCreatedSerde = JsonSerde<OrderCreatedEvent>(OrderCreatedEvent::class.java)

        val stateStore: Materialized<Int, Customer, KeyValueStore<Bytes, ByteArray>> =
                Materialized.`as`<Int, Customer, KeyValueStore<Bytes, ByteArray>>("customer-store")
                        .withKeySerde(intSerde)
                        .withValueSerde(customerSerde)

        val customerTable: KTable<Int, Customer> = input.groupByKey(Serialized.with(intSerde, customerSerde))
                .reduce( _, y -> y , stateStore)


        return (orderEvent.filter  _, value -> value is OrderCreatedEvent 
                .map  key, value -> KeyValue(key, value as OrderCreatedEvent) 
                .selectKey  _, value -> value.customerId  as KStream<Int, OrderCreatedEvent>)
                .join(customerTable,  orderIt, customer ->
                    OrderShippedEvent(orderIt.id, orderIt.productId, customer.name, customer.address)
                , Joined.with(intSerde, orderCreatedSerde, customerSerde))
                .selectKey  _, value -> value.id 
                //.to("order", Produced.with(intSerde, orderShippedSerde))
    

在此过程之后,我将在订单主题中生成一条新消息 key: 1 value: "type":"orderShipped","id":1,"productId":24,"customerName":"Anna","customerAddress":"Cipress Street",但这将被流过滤掉。

【讨论】:

如果所有事件类型都已知,这对我来说很好。但是,有什么建议如何处理/忽略未知类型的事件?例如,可能有一个 OrderDeletedEvent 应该被消费者服务忽略,因为它不相关。在这种情况下,我不想包含额外的、不必要的 OrderDeletedEvent 类。一种解决方案是记录错误并通过配置org.apache.kafka.streams.errors.LogAndContinueExceptionHandler 继续处理,但这意味着所有反序列化异常都将被忽略,这可能是不可取的。【参考方案2】:

您可以使用 Kafka 的记录头来存储记录的类型。见KIP-82。您可以在ProducerRecord 中设置标题。

处理如下:

    从主题中读取KStream&lt;Integer, Bytes&gt; 类型的stream,其值为serde Serdes.BytesSerde

    使用KStream#transformValues() 过滤和创建对象。更具体地说,在transformValues() 中,您可以访问ProcessorContext,它使您可以访问包含有关记录类型信息的记录标题。那么:

    如果类型为OrderShipped,则返回null。 否则从Bytes 对象创建一个OrderCreated 对象并返回它。

对于 AVRO 的解决方案,您可能需要查看以下文档

https://docs.confluent.io/current/streams/developer-guide/datatypes.html https://docs.confluent.io/current/schema-registry/serializer-formatter.html

【讨论】:

我认为您的解决方案可以正常工作,但我对此表示怀疑,transformValues(ValueTransformerSupplier super V,? extends VR> valueTransformerSupplier, java.lang.String... stateStoreNames) 是有状态的操作,它需要在转换方法中的状态存储名称,是否需要传递存储名称或者可以省略... 状态存储可以省略。

以上是关于如何忽略从同一主题读取和写入不同事件类型的 Kafka Streams 应用程序中的某些类型的消息的主要内容,如果未能解决你的问题,请参考以下文章

kafka主题分区的数量和数据中不同键的数量

“无法识别的类型'员工'。忽略。C:/ .....”从.xml读取输入并将输出写入.xls文件+ perl时出错

使用 Spark Structured Streaming 从多个 Kafka 主题读取并写入不同接收器的最佳方式是啥?

应该在同一个Kafka主题中放入几种事件类型吗?

Kafka实践:到底该不该把不同类型的消息放在同一个主题中?

如何从同一生产者向不同的 Kafka 主题和模式注册表生成消息