Kafka Connect 接收器任务中多久触发一次 put()?
Posted
技术标签:
【中文标题】Kafka Connect 接收器任务中多久触发一次 put()?【英文标题】:How often put() is triggered in Kafka Connect sink tasks? 【发布时间】:2016-12-23 20:32:28 【问题描述】:我可以控制触发我的 Kafka Connect Sink 任务的 put()
方法的时间间隔吗? Kafka Connect 框架在这方面的预期行为是什么?理想情况下,我想指定,例如,“不要打电话给我,除非你有 X 个新记录/Y 个新字节,或者自上次调用以来经过了 Z 毫秒”。这可能会使接收器任务中的批处理逻辑更简单(引用documentation,“在许多情况下,内部缓冲将很有用,因此可以一次发送整批记录,从而减少将事件插入到下游数据存储)。
【问题讨论】:
【参考方案1】:今天,只有在 WorkerSinkTask
中调用 deliverMessages 时才会调用来自 SinkTask
的 put。好消息是deliverMessages
唯一发生的时间是在poll 内,因此您应该可以控制通过overriding consumer properties 轮询新记录的频率。
如果您想进行内部缓冲,您可以在其implementation of SinkTask 中查看 HDFSConnector 如何处理此问题。但是,现在,Connect 会立即放入投票返回的所有记录。
综上所述,如果您真的希望在消息到达下游系统之前对其进行批处理,您可以考虑查看offset.flush.interval.ms and offset.flush.timeout.ms,它控制着flush()
的调用频率。
【讨论】:
以上是关于Kafka Connect 接收器任务中多久触发一次 put()?的主要内容,如果未能解决你的问题,请参考以下文章
Kafka Connect 接收到不在公共模式中的 Redshift 表
Kafka Connect JDBC Sink - 一个接收器配置中每个主题(表)的 pk.fields