如何处理 Kafka Connect Sink 中的背压?
Posted
技术标签:
【中文标题】如何处理 Kafka Connect Sink 中的背压?【英文标题】:How to handle backpressure in a Kafka Connect Sink? 【发布时间】:2018-09-29 12:56:17 【问题描述】:我们构建了一个自定义的 Kafka Connect 接收器,该接收器又调用一个远程 REST API。如何将背压传播到 Kafka Connect 基础架构,以便在远程系统比内部消费者向 put() 传递消息的速度慢的情况下调用 put() 的频率降低? Kafka 连接文档说我们不应该在 put() 中阻塞,而是在 flush() 中阻塞。但是在 put() 中不阻塞意味着我们必须缓冲数据,这肯定会在某些时候导致 OOM 异常,如果 put() 比 flush() 更频繁地调用。 我已经看到允许 kafka 消费者调用 pause() 或在 loop() 中阻塞。是否可以在 kafka 连接接收器中利用这一点?
【问题讨论】:
【参考方案1】:如果您可以使用某种自动缩放器,一个想法可能是在接收器上收集一些指标并相应地缩放连接器,无论是在工作人员还是在任务级别(在本例中是通过 REST API)。
【讨论】:
【参考方案2】:我已经看到允许 kafka 消费者调用 pause() 或在 loop() 中阻塞。是否可以在 kafka 连接接收器中利用这一点?
原始消费者没有暴露,所以没有。您可以在整个连接器上调用 /pause
,但我不确定此时未刷新的消息会发生什么。
但在 put() 中不阻塞意味着我们必须缓冲数据,这肯定会在某些时候导致 OOM 异常
当然可以,但这确实是保留数据超过必要时间的唯一可行选择。例如,这就是 S3 和 HDFS 连接器的工作方式。
rotate.interval.ms
调用文件提交的时间间隔(以毫秒为单位)...
您的 HTTP 客户端连接可能无论如何都会阻塞以发出请求,不是吗?
另一种方法是让您的 HTTP 服务器嵌入一个 Kafka 消费者,这样它就可以自己轮询消息并在本地对它们进行操作,而无需通过 HTTP 发送请求。
【讨论】:
>您的 HTTP 客户端连接很可能会阻塞以发出请求,不是吗? 如果他们有异步响应,那么我想你不在乎你的记录到达 Kafka 的顺序是什么?以上是关于如何处理 Kafka Connect Sink 中的背压?的主要内容,如果未能解决你的问题,请参考以下文章
使用 Kafka Streams DSL 时如何处理错误和不提交
1 周后如何处理来自分布式日志代理(例如 Kafka)的日志?