删除 Kafka StateStore 中的记录不起作用(在 .delete(key) 上抛出 NullPointerException)

Posted

技术标签:

【中文标题】删除 Kafka StateStore 中的记录不起作用(在 .delete(key) 上抛出 NullPointerException)【英文标题】:Deleting record in Kafka StateStore does not work (NullPointerException thrown on .delete(key)) 【发布时间】:2020-06-25 17:49:17 【问题描述】:

我的代码中有一个具体化的内存状态存储。我有另一个单独的流,它应该根据某些标准查找和删除记录。

我需要允许我的流访问和删除先前构建的 statestore 中的记录。 我在下面有以下代码

@bean
public StreamBuilder myStreamCodeBean(StreamBuilder streamBuilder) 
    //create store supplier
    KeyValueBytesStoreSupplier myStoreSupplier = Stores.inMemoryKeyValueStore("MyStateStore");

    //materialize statstore and enable caching
    Materialized materializedStore = Materialized.<String, MyObject>as(myStoreSupplier)
            .withKeySerde(Serdes.String())
            .withValueSerde(myObjectSerde)
            .withCachingEnabled();

    //other code here that creates KTable, and another stream to consume records into ktable from another topic
    //........

    //another stream that consumes another topic and deletes records in store with some logic
    streamsBuilder
        .stream("someTopicName", someConsumerObject)
        .filter((key, value) -> 
            KeyValueStore<Bytes, byte[]> kvStore = myStoreSupplier.get();
            kvStore.delete(key);  //StateStore never "open" and this throws nullpointerexception (even tho key is NOT null)
            return true;
        
        .to("some topic name here", producerObject);
    return streamBuilder;

抛出的错误非常普遍。错误是 Kafka 流未运行。

在进行一些调试时,我发现我的 statestore 在删除时没有“打开”。

我在这里做错了什么?我可以使用 ReadOnlyKeyValueStore 读取记录,但我需要删除,所以我不能使用它。

任何帮助表示赞赏。

【问题讨论】:

在您的代码中,您何时使用 materializedStore ?你能显示完整的代码吗? 您无法访问filter 中的商店——您需要使用transform()(或类似名称)并将商店添加到transform() @Yannick 我使用 materializedStore 变量来实现我的 ktable。 @matthias 我会尝试使用 transform(),不知道你不能在过滤器中访问它 正如filter 的JavaDocs 解释:This is a stateless record-by-record operation. :) 【参考方案1】:

必须通过处理器的上下文访问状态存储,而不是使用供应商对象。

创建商店后,您需要确保您尝试从中访问商店的处理器可以访问它。


如果您的商店是本地商店,那么您需要指定哪些处理器将访问该商店。

如果您的存储是全局存储,那么拓扑中的所有处理器都可以访问它。


您正在使用streamsBuilder.stream()至少从您发布的代码中创建一个流,您似乎没有让您的处理器访问状态存储。

    确保您在StreamsBuilder 中调用了addStateStore()

    要获取处理器中的状态存储,我们需要使用context.getStateStore(storeName)。 可以参考以下example

    (我不认为我们可以访问filter() 中的状态存储,因为它是无状态操作)。因此,您可以使用 Processor 或 Transformer 并传入状态存储名称(在您的情况下为 MyStateStore)。

【讨论】:

感谢您的示例。我的代码中的任何地方都没有 ProcessorContext,我相信我们正试图避免它,因为它位于 kafka 的较低级别?无论如何要在没有ProcessorContext的情况下获得StateStore吗?类似于我们如何获取 ReadOnlyKeyValueStore(使用 getKafkaStream().store("storename", QueryableStoreTypes.KeyValueStore())) @ZunairSyed ReadOnlyKeyValueStore,顾名思义,我们不能修改它。所以这是为了读取拓扑之外的存储中的值。我不认为我们可以在没有上下文对象的情况下对状态存储进行写访问,而filter() 似乎没有。

以上是关于删除 Kafka StateStore 中的记录不起作用(在 .delete(key) 上抛出 NullPointerException)的主要内容,如果未能解决你的问题,请参考以下文章

Kafka Streams API:避免在 KTable.mapValues 中添加额外的 stateStore

如何从 Kafka 全局状态存储中删除记录?

Kafka Connect JDBC Sink 连接器:如何删除没有 NULL 值的记录?

不使用 Kafka Connect 复制架构更改

Kafka Connect with MSSQL 不适用于删除操作

如果发生异常,如何禁用记录 Kafka 批处理中的所有消息?