如何在 Kubernetes 上部署 Kafka Stream 应用程序?

Posted

技术标签:

【中文标题】如何在 Kubernetes 上部署 Kafka Stream 应用程序?【英文标题】:How to deploy Kafka Stream applications on Kubernetes? 【发布时间】:2018-09-04 02:11:00 【问题描述】:

我的应用程序有一些聚合/窗口操作,所以它有一些存储在state.dir 中的状态存储。 AFAIK,它还将状态存储的更改日志写入代理, 那么可以将 Kafka Stream 应用程序视为无状态 POD 吗?

【问题讨论】:

【参考方案1】:

我的应用程序有一些聚合/窗口操作,所以它有一些存储在state.dir 中的状态存储。 AFAIK,它还将状态存储的更改日志写入代理,因此可以将 Kafka Stream 应用程序视为无状态 POD 吗?

无状态 pod 和数据安全(= 无数据丢失): 是的,就数据安全而言,您可以将应用程序视为无状态 pod;即,无论 pod 发生什么情况,Kafka 和 Kafka Streams 都保证您不会丢失数据(如果您启用了一次性处理,它们也将保证后者)。

这是因为,正如您已经说过的,应用程序中的状态更改始终通过相应状态存储的更改日志持续备份到 Kafka(代理)——除非您明确禁用此更改日志功能(默认情况下启用)。

注意:当您不使用 Kafka 的 Streams 默认存储引擎 (RocksDB) 而是使用替代的内存存储引擎时,上述情况甚至是正确的。许多人没有意识到这一点,因为他们阅读“内存中”并(错误地)得出“当机器崩溃、重新启动等时数据将丢失”的结论。

无状态 pod 和应用程序恢复/恢复时间: 综上所述,您应该了解在 pod 重新启动后具有与不具有本地状态将如何影响应用程序的恢复/恢复时间(或者更确切地说:应用程序实例),直到它再次完全运行。

假设您的有状态应用程序的一个实例在一台机器上运行。它将其本地状态存储在state.dir 下,并且它还将持续将其本地状态的任何更改备份到远程 Kafka 集群(代理)。

如果应用实例正在重新启动并且没有可以访问其先前的state.dir(可能是因为它在不同的机器上重新启动),它将通过从Kafka 中相关的变更日志。根据您的状态大小,这可能需要几毫秒、几秒、几分钟或更长时间。只有在其状态完全恢复后,它才会开始处理新数据。 如果正在重新启动应用程序实例并且确实可以访问其以前的state.dir(可能是因为它在同一台原始机器上重新启动),它可以更快地恢复,因为它可以重用所有或大部分现有的本地状态,因此只需要从相关的变更日志中恢复一个小的增量。只有在其状态完全恢复后,它才会开始处理新数据。

换句话说,如果您的应用程序能够重用现有的本地状态,那么这很好,因为它将最大限度地减少应用程序的恢复时间。

备用副本在无状态环境中进行救援:但是,即使您正在运行无状态 Pod,您也可以选择通过 num.standby.replicas 设置将应用程序配置为使用 standby replicas,从而最大限度地减少应用程序恢复时间:

num.standby.replicas

备用副本的数量。备用副本是本地状态存储的卷影副本。只要有足够的实例在运行,Kafka Streams 就会尝试创建指定数量的副本并使它们保持最新。备用副本用于最小化任务故障转移的延迟。之前在故障实例上运行的任务优先在具有备用副本的实例上重新启动,以便可以最大限度地减少从其更改日志中恢复本地状态存储的过程。

另见文档部分State restoration during workload rebalance

2018-08-29 更新:可以说,在 Kubernetes 上运行 Kafka/Kafka Streams/KSQL 最方便的选择是使用 Confluent Operator 或 Confluent 提供的 Helm Charts,请参阅 https://www.confluent.io/confluent-operator/。 (免责声明:我为 Confluent 工作。)

2019-01-10 更新:还有一个 Youtube 视频演示如何Scale Kafka Streams with Kubernetes。

【讨论】:

【参考方案2】:

KStreams 使用底层state.dir 进行本地存储。如果 pod get 在同一台机器上重新启动,并且卷已安装,它将立即从原来的位置恢复。

如果 pod 在本地状态不可用的另一台机器上启动,KStreams 将通过重新读取支持的 Kafka 主题来重建状态

https://www.youtube.com/watch?v=oikZg7_vy6A 的短视频展示了 Lenses - 用于 Apache Kafka - 在 Kubernetes 上部署和扩展 KStream 应用程序

【讨论】:

如果在同一主机上挂载一个hostPath volume和多个POD,会不会冲突? 另一个场景,一开始POD在一台主机上运行,​​运行一段时间后退出,再运行在另一台主机上,运行一段时间后又退出又回到第一台主机,此时第一台主机包含过时的状态存储数据,会不会混淆Kafka流? 如果您可以移动音量,您处于更好的位置(状态恢复更快)。在一个 pod 退出(即机器重启器)的常见场景中,另一个 pod 将启动并接手工作。如果存在特定的退出条件,即错误代码/错误数据,您的 pod 将不断失败【参考方案3】:

我想是的。 RocksDB 用于保存状态,以便在执行需要状态本身的操作时快速。正如您已经提到的,状态更改也存储在 Kafka 主题中,因此如果当前流应用程序实例失败,另一个实例(在另一个节点上)可以使用该主题重新构建本地状态并继续处理流作为上一个。

【讨论】:

以上是关于如何在 Kubernetes 上部署 Kafka Stream 应用程序?的主要内容,如果未能解决你的问题,请参考以下文章

Kubernetes 集群中的 Kafka - 如何从 Kubernetes 集群外部发布/使用消息

如何在kubernetes上构建kafka集群后公开kafka以进行外部访问?

从本地 Docker For Mac 中部署的服务访问本地 Kafka(包括 Kubernetes 扩展)

在 Kubernetes 中运行 Kafka 时如何管理页面缓存资源

如何在 Kafka 连接器中正确连接 Elastic Operator 部署的 Elasticsearch?

K8S环境快速部署Kafka(K8S外部可访问)