Kafka 消费者 API 上的 Kafka Streams DSL
Posted
技术标签:
【中文标题】Kafka 消费者 API 上的 Kafka Streams DSL【英文标题】:Kafka Streams DSL over Kafka Consumer API 【发布时间】:2020-12-30 21:57:58 【问题描述】:最近,在一次采访中,有人问我一个关于 Kafka Streams 的问题,更具体地说,面试官想知道为什么/什么时候你会使用 Kafka Streams DSL 而不是普通的 Kafka Consumer API 来读取和处理消息流?我无法提供令人信服的答案,并且想知道使用这两种流处理方式的其他人是否可以分享他们的想法/意见。谢谢。
【问题讨论】:
简单的回答是消费者不能轻易加入不同的话题。 【参考方案1】:像往常一样,这取决于用例何时使用 KafkaStreams API 以及何时使用普通的 KafkaProducer/Consumer。我不敢笼统地选择一个。
首先,KafkaStreams 构建在 KafkaProducers/Consumers 之上,因此使用 KafkaStreams 可以实现的一切也可以通过普通的 Consumer/Producers 实现。
我想说,与普通的消费者/生产者相比,KafkaStreams API 不太复杂,但也不太灵活。现在我们可以开始长时间讨论什么是“少”了。
在开发 Kafka Streams API 时,您可以直接跳入您的业务逻辑应用方法,例如 filter
、map
、join
或 aggregate
,因为所有消费和生产部分都在幕后抽象出来.
当您使用普通的消费者/生产者开发应用程序时,您需要考虑如何在 subscribe
、poll
、send
、flush
等级别构建客户端。
如果您想要更简单(但也更不灵活)ksqldb 是您可以选择构建 Kafka 应用程序的另一个选项。
【讨论】:
【参考方案2】:以下是一些您可能更喜欢 Kafka 流而不是核心生产者/消费者 API 的场景:
它允许您轻松构建复杂的处理管道。所以。让我们假设(一个人为的例子)您有一个包含客户订单的主题,并且您希望根据交货城市过滤订单并将它们保存到数据库表中以进行持久性和 Elasticsearch 索引以实现快速搜索体验。在这种情况下,您将使用来自源主题的消息,使用 Streams DSL filter
函数根据城市过滤掉不必要的订单,将过滤器数据存储到单独的 Kafka 主题(使用 KStream.to()
或 @987654323 @),最后使用 Kafka Connect,将消息存储到数据库表和 Elasticsearch 中。你也可以使用核心的生产者/消费者 API 来做同样的事情,但它会需要更多的编码。
在数据处理管道中,您可以在同一事务中执行消费-过程-生产。因此,在上面的示例中,Kafka 将确保从源主题到数据库和 Elasticsearch 的精确一次语义和事务。由于网络故障和重试,不会引入任何重复的消息。当您在单个产品级别进行订单计数等聚合时,此功能特别有用。在这种情况下,重复总是会给你错误的结果。
您还可以以极低的延迟丰富您的传入数据。假设在上面的示例中,您希望使用存储的客户数据中的客户电子邮件地址来丰富订单数据。如果没有 Kafka Streams,你会怎么做?您可能会为网络上的每个传入订单调用 REST API,这绝对是一项影响吞吐量的昂贵操作。在这种情况下,您可能希望将所需的客户数据存储在压缩的 Kafka 主题中,并使用 KTable
或 GlobalKTable
将其加载到流应用程序中。现在,您只需在 KTable 中为客户电子邮件地址进行简单的本地查找。请注意,这里的 KTable 数据将存储在 Kafka Streams 附带的嵌入式 RocksDB 中,并且由于 KTable 由 Kafka 主题支持,您在流应用程序中的数据将实时不断更新。换句话说,不会有过时的数据。这本质上是物化视图模式的一个例子。
假设您想加入两个不同的数据流。因此,在上面的示例中,您只想处理已成功付款的订单,并且付款数据来自另一个 Kafka 主题。现在,可能会发生付款延迟或付款事件发生在订单事件之前的情况。在这种情况下,您可能需要进行一小时的窗口连接。因此,如果订单和相应的支付事件在一小时内出现,订单将被允许继续进行进一步处理。如您所见,您需要将状态存储一小时窗口,该状态将存储在 Kafka Streams 的 Rocks DB 中。
【讨论】:
以上是关于Kafka 消费者 API 上的 Kafka Streams DSL的主要内容,如果未能解决你的问题,请参考以下文章