Kafka Streams - 状态存储可能已迁移到另一个实例

Posted

技术标签:

【中文标题】Kafka Streams - 状态存储可能已迁移到另一个实例【英文标题】:Kafka Streams - The state store may have migrated to another instance 【发布时间】:2018-08-24 02:18:44 【问题描述】:

我正在编写一个基本应用程序来测试 Kafka Streams 的交互式查询功能。代码如下:

public static void main(String[] args) 
    StreamsBuilder builder = new StreamsBuilder();

    KeyValueBytesStoreSupplier waypointsStoreSupplier = Stores.persistentKeyValueStore("test-store");
    StoreBuilder waypointsStoreBuilder = Stores.keyValueStoreBuilder(waypointsStoreSupplier, Serdes.ByteArray(), Serdes.Integer());

    final KStream<byte[], byte[]> waypointsStream = builder.stream("sample1");

    final KStream<byte[], TruckDriverWaypoint> waypointsDeserialized =  waypointsStream
                                                                        .mapValues(CustomSerdes::deserializeTruckDriverWaypoint)
                                                                        .filter((k,v) -> v.isPresent())
                                                                        .mapValues(Optional::get);

    waypointsDeserialized.groupByKey().aggregate(
            () -> 1,
            (aggKey, newWaypoint, aggValue) -> 

                aggValue = aggValue + 1;
                return aggValue;

            , Materialized.<byte[], Integer, KeyValueStore<Bytes, byte[]>>as("test-store").withKeySerde(Serdes.ByteArray()).withValueSerde(Serdes.Integer())
    );

    final KafkaStreams streams = new KafkaStreams(builder.build(), new StreamsConfig(createStreamsProperties()));

    streams.cleanUp();
    streams.start();    

    ReadOnlyKeyValueStore<byte[], Integer> keyValueStore = streams.store("test-store", QueryableStoreTypes.keyValueStore());

    KeyValueIterator<byte[], Integer> range = keyValueStore.all();
    while (range.hasNext()) 
        KeyValue<byte[], Integer> next = range.next();
        System.out.println(next.value);

    

    Runtime.getRuntime().addShutdownHook(new Thread(streams::close));




protected static Properties createStreamsProperties() 

    final Properties streamsConfiguration = new Properties();

    streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "random167");
    streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "client-id");
    streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    streamsConfiguration.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Serdes.String().getClass().getName());
    streamsConfiguration.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Serdes.Integer().getClass().getName());
    //streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10000);

    return streamsConfiguration;

所以我的问题是,每次运行时都会遇到同样的错误:

线程“main”org.apache.kafka.streams.errors.InvalidStateStoreException 中的异常:状态存储测试存储可能已迁移到另一个实例。

我只运行 1 个应用程序实例,而我正在消费的主题只有 1 个分区。

知道我做错了什么吗?

【问题讨论】:

异常消息很笼统。这是一个已知问题:issues.apache.org/jira/browse/KAFKA-5876 -- 您应该监控应用程序的状态 -- 只有在应用程序正在运行时才能查询商店:docs.confluent.io/current/streams/… @MatthiasJ.Sax,我确实监控了 KafaStreams 的状态,并确保仅在状态为 RUNNING 时查询存储。我仍然遇到同样的错误。 不确定自动取款机。正如@kyle 在他的回答中提到的那样,通常您需要知道商店可能在任何时候都不可用,因此您需要重试。另请注意,在启动时,KafkaStreams 会执行 CRAETED -> RUNNING -> REBALANCING -> RUNNING 转换——因此,如果您在第一次重新平衡后尝试查询,您很可能会因为重新平衡立即发生而得到此异常。 你解决过这个问题吗?有同样的问题,但仅限于订阅多个汇合连接源的流。尽管重试逻辑和 State = RUNNING 的事实,对于这些流,重新启动流应用程序时始终会抛出 InvalidStateStoreException。如果我擦除偏移量和存储,然后启动应用程序,它工作正常。 @mike01010 不,我没有解决它。抛出的异常不够清楚,无法知道问题的根源。 【参考方案1】:

看起来你有一个竞争条件。从 KafkaStreams::start() 的 kafka 流 javadoc 中它说:

通过启动所有线程来启动 KafkaStreams 实例。该函数预计在客户端的生命周期中仅被调用一次。 因为线程是在后台启动的,所以这个方法不会阻塞。

https://kafka.apache.org/10/javadoc/index.html?org/apache/kafka/streams/KafkaStreams.html

您在streams.start() 之后立即调用streams.store(),但我敢打赌您处于尚未完全初始化的状态。

由于此代码似乎仅用于测试,请在其中添加 Thread.sleep(5000) 或其他内容并试一试。 (这不是生产解决方案)根据您对该主题的输入率,这可能会给商店一些时间来开始填充事件,以便您的 KeyValueIterator 实际上有一些东西要处理/打印。

【讨论】:

那么您建议在生产中做什么? 不是一个详尽的列表,但是:a) 将 store 客户端与服务器分开,b) 在客户端中添加强大的错误处理以处理客户端无法连接到 kafka 流的所有情况服务器无论出于何种原因。您的应用程序如何处理这些情况?由你决定!【参考方案2】:

可能不适用于 OP,但可能对其他人有所帮助:

在尝试检索 KTable 的存储区时,请确保 KTable 的主题首先存在,否则您将收到此异常。

【讨论】:

如何查看主题是否存在? @emirhosseini 请检查以下问题的answer 谢谢!拯救了我的周末。【参考方案3】:

我在消费商店之前未能致电Storebuilder

【讨论】:

【参考方案4】:

发生这种情况通常有两个原因:

本地 KafkaStreams 实例尚未准备好(即,尚未在 运行时状态 RUNNING,请参阅运行时状态信息),因此其 还不能查询当地的国有商店。本地 KafkaStreams 实例已准备就绪(例如,处于运行时状态 RUNNING),但特定的 状态存储只是在幕后迁移到另一个实例。 这可能特别发生在分布式的启动阶段 应用程序或添加/删除应用程序实例时。

https://docs.confluent.io/platform/current/streams/faq.html#handling-invalidstatestoreexception-the-state-store-may-have-migrated-to-another-instance

最简单的方法是在调用KafkaStreams#store()时防范InvalidStateStoreException:

// Example: Wait until the store of type T is queryable.  When it is, return a reference to the store.
public static <T> T waitUntilStoreIsQueryable(final String storeName,
                                              final QueryableStoreType<T> queryableStoreType,
                                              final KafkaStreams streams) throws InterruptedException 
  while (true) 
    try 
      return streams.store(storeName, queryableStoreType);
     catch (InvalidStateStoreException ignored) 
      // store not yet ready for querying
      Thread.sleep(100);
    
  

【讨论】:

以上是关于Kafka Streams - 状态存储可能已迁移到另一个实例的主要内容,如果未能解决你的问题,请参考以下文章

Kafka Streams State Store

我们可以使用任何其他数据库,如 MariaDB 或 MongoDB 来在 Kafka Streams 中存储状态而不是 Rocks DB,有啥方法可以配置它吗?

将Kafka Streams代码迁移到Spring Cloud Stream吗?

Kafka Connect vs Streams for Sinks [关闭]

Kafka Streams窗口加入了保留

初探Kafka Streams