仅读取来自 kafka 主题的特定消息

Posted

技术标签:

【中文标题】仅读取来自 kafka 主题的特定消息【英文标题】:reading only specific messages from kafka topic 【发布时间】:2019-02-18 07:22:07 【问题描述】:

场景:

我正在将数据 JSON 对象数据写入 kafka 主题,同时阅读我想根据消息中存在的值读取一组特定的消息。我正在使用 kafka-python 库。

示例消息:

flow_status: "completed", value: 1, active: yes
flow_status:"failure",value 2, active:yes

在这里,我只想阅读 flow_Status 为已完成的消息。

【问题讨论】:

我创建了 kafka 客户,他们可以在 spring kafka 的帮助下做到这一点。看完这篇博文rcvaram.medium.com/…,你可能会有所了解 【参考方案1】:

在 Kafka 中,不可能做这样的事情。 消费者从最新提交的偏移量开始(或从头开始,或在特定偏移量处寻找)一个接一个地消费消息。 取决于您的用例,也许您的场景中可能有不同的流程:处理该过程的消息进入一个主题,然后是处理该操作的应用程序,然后将结果(完成或失败)写入两个不同的主题: 这样你就完成了,从失败中分离出来。 另一种方法是使用 Kafka Streams 应用程序进行过滤,但考虑到它只是一种糖,实际上流应用程序将始终读取所有消息,但允许您轻松过滤消息。

【讨论】:

所以我可以有 3 个主题,1 个用于整个日志,1 个用于完成状态,1 个用于失败状态...作业将写入主题 1,然后将根据状态过滤数据到其他主题。 确切地说,不知何故,您的状态是在此用例中值得不同主题的消息类型(一个代表完成,一个代表失败) 这是一个好方法吗,有两个分区的单个主题(一个用于完成,一个用于失败),而发送将保留生产者中的逻辑以将数据发送到各个分区......在消费者端,将创建单独的 consumer_groups ,一组从失败的分区中读取,另一组从完成的分区中读取 生产者端可能很好,但是您需要实现一个自定义分区器来做到这一点。在消费者方面则完全相反,两个消费者需要在同一个消费者组中,才能为每个消费者分配一个分区。如果他们属于不同的消费者组,他们将从两个分区获取所有消息。无论如何它都不能很好地工作,因为如果一个消费者崩溃,另一个消费者将获得另一个分区(接收完成和失败的消息)。您可以避免使用消费者组,而是直接分配分区。【参考方案2】:

您可以创建两个不同的主题;一个表示已完成,另一个表示失败状态。然后从已完成的主题中读取消息进行处理。

否则,如果您希望它们在一个主题中并且只想阅读已完成的主题,我相信您需要将它们全部阅读并使用简单的 if-else 条件忽略失败的主题。

【讨论】:

【参考方案3】:

Kafka 消费者不预先支持这种功能。您必须按顺序使用所有事件,过滤掉状态已完成的事件并将其放在某个地方。相反,您可以考虑使用 Kafka Streams 应用程序,您可以在其中将数据作为流读取并过滤 flow_status = "completed" 的事件并在某些输出主题或其他目的地中发布。

例子:

KStream<String,JsonNode> inputStream= builder.stream(inputTopic);
KStream<String,JsonNode> completedFlowStream = inputStream.filter(value-> value.get("flow_status").equals("completed"));

附: Kafka 没有 KStream 的 Python API 官方版本,但有开源项目:https://github.com/wintoncode/winton-kafka-streams

【讨论】:

【参考方案4】:

目前还不能在代理端实现它,有一个 Jira 功能请求向 apache kafka 开放以实现此功能,您可以在这里跟踪它,我希望他们能在不久的将来实现这个功能: https://issues.apache.org/jira/browse/KAFKA-6020

我觉得最好的方法是使用 RecordFilterStrategy (Java/spring) 接口并在消费者端对其进行过滤。

【讨论】:

以上是关于仅读取来自 kafka 主题的特定消息的主要内容,如果未能解决你的问题,请参考以下文章

无法读取 Kafka 主题 avro 消息

是否可以仅将 SMT(单消息转换)应用于来自指定主题的消息

阅读来自 kafka 的最新消息-segmentio/kafka-go

确保已使用 REST 代理从 Kafka 主题读取所有消息

卡夫卡消费者:受控阅读主题

如何忽略从同一主题读取和写入不同事件类型的 Kafka Streams 应用程序中的某些类型的消息