如何处理 Kafka Connect Sink 中的背压?

Posted

技术标签:

【中文标题】如何处理 Kafka Connect Sink 中的背压?【英文标题】:How to handle backpressure in a Kafka Connect Sink? 【发布时间】:2018-09-29 12:56:17 【问题描述】:

我们构建了一个自定义的 Kafka Connect 接收器,该接收器又调用一个远程 REST API。如何将背压传播到 Kafka Connect 基础架构,以便在远程系统比内部消费者向 put() 传递消息的速度慢的情况下调用 put() 的频率降低? Kafka 连接文档说我们不应该在 put() 中阻塞,而是在 flush() 中阻塞。但是在 put() 中不阻塞意味着我们必须缓冲数据,这肯定会在某些时候导致 OOM 异常,如果 put() 比 flush() 更频繁地调用。 我已经看到允许 kafka 消费者调用 pause() 或在 loop() 中阻塞。是否可以在 kafka 连接接收器中利用这一点?

【问题讨论】:

【参考方案1】:

如果您可以使用某种自动缩放器,一个想法可能是在接收器上收集一些指标并相应地缩放连接器,无论是在工作人员还是在任务级别(在本例中是通过 REST API)。

【讨论】:

【参考方案2】:

我已经看到允许 kafka 消费者调用 pause() 或在 loop() 中阻塞。是否可以在 kafka 连接接收器中利用这一点?

原始消费者没有暴露,所以没有。您可以在整个连接器上调用 /pause,但我不确定此时未刷新的消息会发生什么。

但在 put() 中不阻塞意味着我们必须缓冲数据,这肯定会在某些时候导致 OOM 异常

当然可以,但这确实是保留数据超过必要时间的唯一可行选择。例如,这就是 S3 和 HDFS 连接器的工作方式。

rotate.interval.ms 调用文件提交的时间间隔(以毫秒为单位)...

您的 HTTP 客户端连接可能无论如何都会阻塞以发出请求,不是吗?

另一种方法是让您的 HTTP 服务器嵌入一个 Kafka 消费者,这样它就可以自己轮询消息并在本地对它们进行操作,而无需通过 HTTP 发送请求。

【讨论】:

>您的 HTTP 客户端连接很可能会阻塞以发出请求,不是吗? 如果他们有异步响应,那么我想你不在乎你的记录到达 Kafka 的顺序是什么?

以上是关于如何处理 Kafka Connect Sink 中的背压?的主要内容,如果未能解决你的问题,请参考以下文章

使用 Kafka Streams DSL 时如何处理错误和不提交

OpenID Connect 如何处理服务链?

OpenID Connect - 如何处理单次注销

1 周后如何处理来自分布式日志代理(例如 Kafka)的日志?

如何处理 kafka KStream 并直接写入数据库而不是发送另一个主题

如何处理 JSON 响应