如何从 Kafka 全局状态存储中删除记录?
Posted
技术标签:
【中文标题】如何从 Kafka 全局状态存储中删除记录?【英文标题】:How to Delete Record from Kafka global state store? 【发布时间】:2020-08-29 12:41:53 【问题描述】:恢复期间的全局状态存储将转储来自源主题的数据(这被认为是全局存储的更改日志主题)。
为了删除一条记录,我执行如下操作
kvStore.put("key-1",null)
Kafka 如何知道记录已被删除,并且在恢复过程中它会从源主题转储记录(考虑源主题有一条 key-1 的记录)
在我的拓扑中我有
输入主题 -> T1 并附加了一个从 T1 读取数据并从记录中构造一个键并向下转发到主题 T2 的进程 主题 T2 是全局状态存储的源主题。例子:
T1 我得到了数据:"id":'123', "name":"Mohit", "type":"insert"
构造一个键记录并转发到主题,键和值为T2 -> 键:123
和值:"id":'123', "name":"Mohit"
在相同的关键记录之后作为数据类型删除。
T1得到数据:"id":'123', "name":"Mohit", "type":"insert"
所以我像这样转发记录
this.context.forward(key, null)
key: 123 value:null
在状态存储中更新相同
我只想知道,在恢复过程中,这条记录将被删除,这意味着如果我使用密钥 123
进入商店,我将获得空值。
【问题讨论】:
不是 100% 确定我是否理解这个问题。但是,全局存储是只读的,您不能直接放置/删除数据(全局处理器只能按原样从主题中获取日期来更新存储)。 【参考方案1】:状态存储更改日志是压缩主题。要从压缩主题中删除消息,您需要执行put(key, null)
操作。具有null
值的消息称为墓碑,它最终会被主题清理器删除。
请注意,消息只会(最终)在状态存储中被删除,而不是在输入主题中。
最后,键为123
的记录应该从状态存储中完全删除。
【讨论】:
我说的是全局状态存储使用源主题作为他在恢复期间的更改日志。由于数据在源主题中存在 key:123 ,因此它将在全局存储中加载数据 是的,它会加载全局状态sore的数据,但由于值为“null”,它最终会从存储中删除。 我认为您别无选择,只能在(压缩的)主题中放置空值。否则将不会考虑恢复。以上是关于如何从 Kafka 全局状态存储中删除记录?的主要内容,如果未能解决你的问题,请参考以下文章
删除 Kafka StateStore 中的记录不起作用(在 .delete(key) 上抛出 NullPointerException)