Kafka Connect 接收器任务中多久触发一次 put()?

Posted

技术标签:

【中文标题】Kafka Connect 接收器任务中多久触发一次 put()?【英文标题】:How often put() is triggered in Kafka Connect sink tasks? 【发布时间】:2016-12-23 20:32:28 【问题描述】:

我可以控制触发我的 Kafka Connect Sink 任务的 put() 方法的时间间隔吗? Kafka Connect 框架在这方面的预期行为是什么?理想情况下,我想指定,例如,“不要打电话给我,除非你有 X 个新记录/Y 个新字节,或者自上次调用以来经过了 Z 毫秒”。这可能会使接收器任务中的批处理逻辑更简单(引用documentation,“在许多情况下,内部缓冲将很有用,因此可以一次发送整批记录,从而减少将事件插入到下游数据存储)。

【问题讨论】:

【参考方案1】:

今天,只有在 WorkerSinkTask 中调用 deliverMessages 时才会调用来自 SinkTask 的 put。好消息是deliverMessages 唯一发生的时间是在poll 内,因此您应该可以控制通过overriding consumer properties 轮询新记录的频率。

如果您想进行内部缓冲,您可以在其implementation of SinkTask 中查看 HDFSConnector 如何处理此问题。但是,现在,Connect 会立即放入投票返回的所有记录。

综上所述,如果您真的希望在消息到达下游系统之前对其进行批处理,您可以考虑查看offset.flush.interval.ms and offset.flush.timeout.ms,它控制着flush() 的调用频率。

【讨论】:

以上是关于Kafka Connect 接收器任务中多久触发一次 put()?的主要内容,如果未能解决你的问题,请参考以下文章

Kafka Connect JDBC 接收器连接器

Kafka Connect 接收到不在公共模式中的 Redshift 表

Kafka Connect JDBC Sink - 一个接收器配置中每个主题(表)的 pk.fields

如何处理 Kafka Connect Sink 中的背压?

Kafka Connect:一个接收器连接器,用于从一个主题写入多个表

使用 Kafka KSQL AVRO 表作为 Kafka Connect JDBC Sink 源的问题