仅读取来自 kafka 主题的特定消息
Posted
技术标签:
【中文标题】仅读取来自 kafka 主题的特定消息【英文标题】:reading only specific messages from kafka topic 【发布时间】:2019-02-18 07:22:07 【问题描述】:场景:
我正在将数据 JSON 对象数据写入 kafka 主题,同时阅读我想根据消息中存在的值读取一组特定的消息。我正在使用 kafka-python 库。
示例消息:
flow_status: "completed", value: 1, active: yes
flow_status:"failure",value 2, active:yes
在这里,我只想阅读 flow_Status 为已完成的消息。
【问题讨论】:
我创建了 kafka 客户,他们可以在 spring kafka 的帮助下做到这一点。看完这篇博文rcvaram.medium.com/…,你可能会有所了解 【参考方案1】:在 Kafka 中,不可能做这样的事情。 消费者从最新提交的偏移量开始(或从头开始,或在特定偏移量处寻找)一个接一个地消费消息。 取决于您的用例,也许您的场景中可能有不同的流程:处理该过程的消息进入一个主题,然后是处理该操作的应用程序,然后将结果(完成或失败)写入两个不同的主题: 这样你就完成了,从失败中分离出来。 另一种方法是使用 Kafka Streams 应用程序进行过滤,但考虑到它只是一种糖,实际上流应用程序将始终读取所有消息,但允许您轻松过滤消息。
【讨论】:
所以我可以有 3 个主题,1 个用于整个日志,1 个用于完成状态,1 个用于失败状态...作业将写入主题 1,然后将根据状态过滤数据到其他主题。 确切地说,不知何故,您的状态是在此用例中值得不同主题的消息类型(一个代表完成,一个代表失败) 这是一个好方法吗,有两个分区的单个主题(一个用于完成,一个用于失败),而发送将保留生产者中的逻辑以将数据发送到各个分区......在消费者端,将创建单独的 consumer_groups ,一组从失败的分区中读取,另一组从完成的分区中读取 生产者端可能很好,但是您需要实现一个自定义分区器来做到这一点。在消费者方面则完全相反,两个消费者需要在同一个消费者组中,才能为每个消费者分配一个分区。如果他们属于不同的消费者组,他们将从两个分区获取所有消息。无论如何它都不能很好地工作,因为如果一个消费者崩溃,另一个消费者将获得另一个分区(接收完成和失败的消息)。您可以避免使用消费者组,而是直接分配分区。【参考方案2】:您可以创建两个不同的主题;一个表示已完成,另一个表示失败状态。然后从已完成的主题中读取消息进行处理。
否则,如果您希望它们在一个主题中并且只想阅读已完成的主题,我相信您需要将它们全部阅读并使用简单的 if-else 条件忽略失败的主题。
【讨论】:
【参考方案3】:Kafka 消费者不预先支持这种功能。您必须按顺序使用所有事件,过滤掉状态已完成的事件并将其放在某个地方。相反,您可以考虑使用 Kafka Streams 应用程序,您可以在其中将数据作为流读取并过滤 flow_status = "completed" 的事件并在某些输出主题或其他目的地中发布。
例子:
KStream<String,JsonNode> inputStream= builder.stream(inputTopic);
KStream<String,JsonNode> completedFlowStream = inputStream.filter(value-> value.get("flow_status").equals("completed"));
附: Kafka 没有 KStream 的 Python API 官方版本,但有开源项目:https://github.com/wintoncode/winton-kafka-streams
【讨论】:
【参考方案4】:目前还不能在代理端实现它,有一个 Jira 功能请求向 apache kafka 开放以实现此功能,您可以在这里跟踪它,我希望他们能在不久的将来实现这个功能: https://issues.apache.org/jira/browse/KAFKA-6020
我觉得最好的方法是使用 RecordFilterStrategy (Java/spring) 接口并在消费者端对其进行过滤。
【讨论】:
以上是关于仅读取来自 kafka 主题的特定消息的主要内容,如果未能解决你的问题,请参考以下文章