Kafka Streams - 状态存储可能已迁移到另一个实例
Posted
技术标签:
【中文标题】Kafka Streams - 状态存储可能已迁移到另一个实例【英文标题】:Kafka Streams - The state store may have migrated to another instance 【发布时间】:2018-08-24 02:18:44 【问题描述】:我正在编写一个基本应用程序来测试 Kafka Streams 的交互式查询功能。代码如下:
public static void main(String[] args)
StreamsBuilder builder = new StreamsBuilder();
KeyValueBytesStoreSupplier waypointsStoreSupplier = Stores.persistentKeyValueStore("test-store");
StoreBuilder waypointsStoreBuilder = Stores.keyValueStoreBuilder(waypointsStoreSupplier, Serdes.ByteArray(), Serdes.Integer());
final KStream<byte[], byte[]> waypointsStream = builder.stream("sample1");
final KStream<byte[], TruckDriverWaypoint> waypointsDeserialized = waypointsStream
.mapValues(CustomSerdes::deserializeTruckDriverWaypoint)
.filter((k,v) -> v.isPresent())
.mapValues(Optional::get);
waypointsDeserialized.groupByKey().aggregate(
() -> 1,
(aggKey, newWaypoint, aggValue) ->
aggValue = aggValue + 1;
return aggValue;
, Materialized.<byte[], Integer, KeyValueStore<Bytes, byte[]>>as("test-store").withKeySerde(Serdes.ByteArray()).withValueSerde(Serdes.Integer())
);
final KafkaStreams streams = new KafkaStreams(builder.build(), new StreamsConfig(createStreamsProperties()));
streams.cleanUp();
streams.start();
ReadOnlyKeyValueStore<byte[], Integer> keyValueStore = streams.store("test-store", QueryableStoreTypes.keyValueStore());
KeyValueIterator<byte[], Integer> range = keyValueStore.all();
while (range.hasNext())
KeyValue<byte[], Integer> next = range.next();
System.out.println(next.value);
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
protected static Properties createStreamsProperties()
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "random167");
streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "client-id");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Serdes.Integer().getClass().getName());
//streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10000);
return streamsConfiguration;
所以我的问题是,每次运行时都会遇到同样的错误:
线程“main”org.apache.kafka.streams.errors.InvalidStateStoreException 中的异常:状态存储测试存储可能已迁移到另一个实例。
我只运行 1 个应用程序实例,而我正在消费的主题只有 1 个分区。
知道我做错了什么吗?
【问题讨论】:
异常消息很笼统。这是一个已知问题:issues.apache.org/jira/browse/KAFKA-5876 -- 您应该监控应用程序的状态 -- 只有在应用程序正在运行时才能查询商店:docs.confluent.io/current/streams/… @MatthiasJ.Sax,我确实监控了 KafaStreams 的状态,并确保仅在状态为 RUNNING 时查询存储。我仍然遇到同样的错误。 不确定自动取款机。正如@kyle 在他的回答中提到的那样,通常您需要知道商店可能在任何时候都不可用,因此您需要重试。另请注意,在启动时,KafkaStreams 会执行 CRAETED -> RUNNING -> REBALANCING -> RUNNING 转换——因此,如果您在第一次重新平衡后尝试查询,您很可能会因为重新平衡立即发生而得到此异常。 你解决过这个问题吗?有同样的问题,但仅限于订阅多个汇合连接源的流。尽管重试逻辑和 State = RUNNING 的事实,对于这些流,重新启动流应用程序时始终会抛出 InvalidStateStoreException。如果我擦除偏移量和存储,然后启动应用程序,它工作正常。 @mike01010 不,我没有解决它。抛出的异常不够清楚,无法知道问题的根源。 【参考方案1】:看起来你有一个竞争条件。从 KafkaStreams::start()
的 kafka 流 javadoc 中它说:
通过启动所有线程来启动 KafkaStreams 实例。该函数预计在客户端的生命周期中仅被调用一次。 因为线程是在后台启动的,所以这个方法不会阻塞。
https://kafka.apache.org/10/javadoc/index.html?org/apache/kafka/streams/KafkaStreams.html
您在streams.start()
之后立即调用streams.store()
,但我敢打赌您处于尚未完全初始化的状态。
由于此代码似乎仅用于测试,请在其中添加 Thread.sleep(5000)
或其他内容并试一试。 (这不是生产解决方案)根据您对该主题的输入率,这可能会给商店一些时间来开始填充事件,以便您的 KeyValueIterator
实际上有一些东西要处理/打印。
【讨论】:
那么您建议在生产中做什么? 不是一个详尽的列表,但是:a) 将 store 客户端与服务器分开,b) 在客户端中添加强大的错误处理以处理客户端无法连接到 kafka 流的所有情况服务器无论出于何种原因。您的应用程序如何处理这些情况?由你决定!【参考方案2】:可能不适用于 OP,但可能对其他人有所帮助:
在尝试检索 KTable 的存储区时,请确保 KTable 的主题首先存在,否则您将收到此异常。
【讨论】:
如何查看主题是否存在? @emirhosseini 请检查以下问题的answer 谢谢!拯救了我的周末。【参考方案3】:我在消费商店之前未能致电Storebuilder
。
【讨论】:
【参考方案4】:发生这种情况通常有两个原因:
本地 KafkaStreams 实例尚未准备好(即,尚未在 运行时状态 RUNNING,请参阅运行时状态信息),因此其 还不能查询当地的国有商店。本地 KafkaStreams 实例已准备就绪(例如,处于运行时状态 RUNNING),但特定的 状态存储只是在幕后迁移到另一个实例。 这可能特别发生在分布式的启动阶段 应用程序或添加/删除应用程序实例时。
https://docs.confluent.io/platform/current/streams/faq.html#handling-invalidstatestoreexception-the-state-store-may-have-migrated-to-another-instance
最简单的方法是在调用KafkaStreams#store()时防范InvalidStateStoreException:
// Example: Wait until the store of type T is queryable. When it is, return a reference to the store.
public static <T> T waitUntilStoreIsQueryable(final String storeName,
final QueryableStoreType<T> queryableStoreType,
final KafkaStreams streams) throws InterruptedException
while (true)
try
return streams.store(storeName, queryableStoreType);
catch (InvalidStateStoreException ignored)
// store not yet ready for querying
Thread.sleep(100);
【讨论】:
以上是关于Kafka Streams - 状态存储可能已迁移到另一个实例的主要内容,如果未能解决你的问题,请参考以下文章
我们可以使用任何其他数据库,如 MariaDB 或 MongoDB 来在 Kafka Streams 中存储状态而不是 Rocks DB,有啥方法可以配置它吗?
将Kafka Streams代码迁移到Spring Cloud Stream吗?