确保已使用 REST 代理从 Kafka 主题读取所有消息
Posted
技术标签:
【中文标题】确保已使用 REST 代理从 Kafka 主题读取所有消息【英文标题】:Ensuring that all messages have been read from Kafka topic using REST Proxy 【发布时间】:2019-12-04 22:27:09 【问题描述】:我是 Kafka 新手,我们的团队正在研究服务间通信的模式。
目标
我们有两个服务,P(生产者)和 C(消费者)。 P 是 C 需要的一组数据的真实来源。当 C 启动时,它需要将所有当前数据从 P 加载到它的缓存中,然后订阅更改通知。 (换句话说,我们希望在服务之间同步数据。)
数据总量比较少,变化不频繁。同步的短暂延迟是可以接受的(最终一致性)。
我们想解耦服务,这样 P 和 C 就不需要互相了解了。
提案
当 P 启动时,它会将其所有数据发布到启用了日志压缩的 Kafka 主题。每条消息都是一个aggregate,并带有一个作为其 ID 的键。
当 C 启动时,它会从主题的开头读取所有消息并填充其缓存。然后它会继续读取其偏移量以收到更新通知。
当 P 更新其数据时,它会为已更改的聚合发布一条消息。 (此消息与原始消息具有相同的架构。)
当 C 收到一条新消息时,它会更新其缓存中的相应数据。
约束
我们正在使用Confluent REST Proxy 与 Kafka 进行通信。
问题
当 C 启动时,它如何知道它何时从主题中读取了所有消息,以便可以安全地开始处理?
如果 C 没有立即注意到 P 一秒钟前发送的消息,这是可以接受的。如果 C 在消费一小时前 P 发送的消息之前开始处理,这是不可接受的。请注意,我们不知道何时会更新 P 的数据。
我们不希望 C 在消费每条消息后必须等待 REST 代理的轮询间隔。
【问题讨论】:
我不太明白你的第一个问题“当 C 启动时,它如何知道它何时读取了主题中的所有消息,以便它可以安全地开始处理?”。您的意思是消费者如何知道从哪个偏移量开始处理,以防它之前停止? @GiorgosMyrianthous 我认为问题更多是关于确保 C 具有完全填充的缓存。 C 需要完全填充其内部数据缓存并保持最新,然后才能完成其引导过程。从日志中读取时,没有任何迹象表明 C 已经消耗了它需要的所有内容。 C 不一定知道日志中有多少条记录,也不一定知道应该有多少条记录组成它的缓存。所以它无法知道它的缓存何时准备好执行操作。 @CTC 是正确的。在 CLI 中,我可以检查我的消费者组的延迟是否为 0,这意味着它是最新的写入。从 REST 代理,我还没有看到任何方法来检查这个。 REST 代理不也暴露组信息吗?或者至少是主题偏移信息 是的。您可以创建一个不读取任何数据的假组,然后将其查找到最后,然后从中找到偏移量……至少这是一种方法,无需使用会暴露滞后的单独服务 【参考方案1】:如果你想找到一个消费者组的结束分区,为了知道你什么时候得到了一个时间点的所有数据,你可以使用
POST /consumers/(string: group_name)/instances/(string: instance)/positions/end
请注意,您必须在寻找之前进行投票 (GET /consumers/.../records
),但您不需要提交。
如果您不想影响现有消费者组的偏移量,则必须单独发布一个。
然后您可以使用
查询偏移量GET /consumers/(string: group_name)/instances/(string: instance)/offsets
请注意,在计算结束偏移量和实际到达结束之间可能会有数据写入主题,因此您可能希望在最终到达结束时进行一些额外的设置以进行更多消耗。
【讨论】:
POST 到/positions/end
会更改获取(已使用)偏移量,但不会提交 偏移量。 GETting /offsets
获取最后的 committed 偏移量。即使在寻求结束后提交补偿似乎也不会改变任何事情。有什么想法吗?
看来你必须寻求结束,轮询(GET /records
),然后提交偏移量。之后,您可以查询以获取偏移量。
我们确实实施了这个解决方案,但在测试期间得到了不确定的结果。有时,当它应该返回值时,我们会得到一个用于偏移量的空数组。它依赖于从获取请求超时到 max_bytes 查询参数的所有内容,我不推荐这种方法。【参考方案2】:
替代解决方案(未测试):
如果消费者同时也是生产者呢?
-
当 C 启动时,它会向压缩主题(将要读取的主题相同)发布一条消息,其中的键不会与 P 中的键重叠。该值是 GUID 或随机数;基本上是一个随机数。
C 订阅压缩主题并开始消费。
当 C 收到其唯一密钥且随机数与它发送的内容匹配时(如果清理线程尚未压缩日志,它可能会多次获取该密钥),它知道它可以安全地开始处理。
这确实假设一个分区。
【讨论】:
以上是关于确保已使用 REST 代理从 Kafka 主题读取所有消息的主要内容,如果未能解决你的问题,请参考以下文章