如何从 Kafka 全局状态存储中删除记录?

Posted

技术标签:

【中文标题】如何从 Kafka 全局状态存储中删除记录?【英文标题】:How to Delete Record from Kafka global state store? 【发布时间】:2020-08-29 12:41:53 【问题描述】:

恢复期间的全局状态存储将转储来自源主题的数据(这被认为是全局存储的更改日志主题)。

为了删除一条记录,我执行如下操作

kvStore.put("key-1",null)

Kafka 如何知道记录已被删除,并且在恢复过程中它会从源主题转储记录(考虑源主题有一条 key-1 的记录)

在我的拓扑中我有

输入主题 -> T1 并附加了一个从 T1 读取数据并从记录中构造一个键并向下转发到主题 T2 的进程 主题 T2 是全局状态存储的源主题。

例子:

T1 我得到了数据:"id":'123', "name":"Mohit", "type":"insert" 构造一个键记录并转发到主题,键和值为T2 -> 键:123 和值:"id":'123', "name":"Mohit"

在相同的关键记录之后作为数据类型删除。 T1得到数据:"id":'123', "name":"Mohit", "type":"insert"

所以我像这样转发记录

this.context.forward(key, null)
key: 123 value:null

在状态存储中更新相同

我只想知道,在恢复过程中,这条记录将被删除,这意味着如果我使用密钥 123 进入商店,我将获得空值。

【问题讨论】:

不是 100% 确定我是否理解这个问题。但是,全局存储是只读的,您不能直接放置/删除数据(全局处理器只能按原样从主题中获取日期来更新存储)。 【参考方案1】:

状态存储更改日志是压缩主题。要从压缩主题中删除消息,您需要执行put(key, null) 操作。具有null 值的消息称为墓碑,它最终会被主题清理器删除。

请注意,消息只会(最终)在状态存储中被删除,而不是在输入主题中。

最后,键为123 的记录应该从状态存储中完全删除。

【讨论】:

我说的是全局状态存储使用源主题作为他在恢复期间的更改日志。由于数据在源主题中存在 key:123 ,因此它将在全局存储中加载数据 是的,它会加载全局状态sore的数据,但由于值为“null”,它最终会从存储中删除。 我认为您别无选择,只能在(压缩的)主题中放置空值。否则将不会考虑恢复。

以上是关于如何从 Kafka 全局状态存储中删除记录?的主要内容,如果未能解决你的问题,请参考以下文章

删除 Kafka StateStore 中的记录不起作用(在 .delete(key) 上抛出 NullPointerException)

Kafka Connect JDBC Sink 连接器:如何删除没有 NULL 值的记录?

从存储库历史记录中删除提交 [重复]

如何从 terraform 状态中删除资源?

如何在某个日期范围内从表存储中删除记录?

#yyds干货盘点# 阿里四面:kafka何时如何删除Topic?