如何使用Apache Kafka进行内容过滤?

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何使用Apache Kafka进行内容过滤?相关的知识,希望对你有一定的参考价值。

我有一个名为mytopic的主题。此主题有一个生产者和两个消费者。我需要做的是根据前缀过滤生产者生成的消息。例如,如果消息以“a”前缀开头,则只有第一个消费者必须接受它。如果它以'b'前缀开头,则只有第二个消费者必须接受它。

我做了很多搜索,我找到的是过滤来自主题的消息,然后在过滤后将它们发送到不同的主题。但如上所述,我需要对一个主题进行过滤。我怎么能在卡夫卡那样做?

答案

一旦获得记录,允许两个消费者使用所有数据,使用具有特定于消费者的过滤器逻辑的java流过滤它们。

简而言之,我的意思是只是按原样获取数据并使用Java代码过滤它们,而不是在Kafka Level上进行过滤。

更新:

如果要在Kafka级别进行过滤,则可以使用分区,同时向kafka主题发送消息,将带有前缀“a”的消息发送到Partition-1,将带有前缀“b”的消息发送到Partition-2。

现在,消费只消耗相应消费者中的特定分区。

另一答案

这很简单,不需要回写不同的主题。 “2个消费者”是指1个消费群体中的2个消费者群体或2个消费者群体? 我会谈两个。

如果它是1个使用者组中的2个使用者线程,则可以使用消息“密钥”字段。 Kafka将相同的“密钥”消息发送到同一个“分区”。 例如,带有键字段'a'的消息前缀'a',带有键字段'b'的消息前缀'b',然后Kafka将消息发送到'Partition-1',b消息发送到'Partition-2 ”。消费者线程A可以订阅指定的'mytopic-Partition-1',线程B可以使用类'org.apache.kafka.clients.consumer.KafkaConsumer'中的'assign'方法订阅'mytopic-Partition-2'。

如果它是2个用户组,只需订阅主题并在代码中过滤。如果不满意,请使用上面相同的方法。 技巧是将特定的前缀消息发送到特定的“分区”。 如果你真的想要过滤器,也许你可以使用Kafka Connect插件。

以上是关于如何使用Apache Kafka进行内容过滤?的主要内容,如果未能解决你的问题,请参考以下文章

Apache Flink:使用Apache Kafka作为DataSource的简单demo

java spark-streaming接收TCP/Kafka数据

Apache Kafka:使用java方式操作stream(实现官方的wordcount)

Apache Kafka:集群的搭建和测试

使用 Apache Kafka 将数据从 MSSQL 同步到 Elasticsearch

Apache Kafka:使用java和命令行方式操作topic(admin api)