用于批量操作的 Kafka Elasticsearch 连接器
Posted
技术标签:
【中文标题】用于批量操作的 Kafka Elasticsearch 连接器【英文标题】:Kafka Elasticsearch Connector for bulk operations 【发布时间】:2021-03-26 12:18:10 【问题描述】:我正在使用 Elasticsearch Sink 连接器对单个记录进行操作(索引、更新、删除)。
Elasticsearch 还有一个 /_bulk 端点,可用于一次创建、更新、索引或删除多条记录。文档here。
Elasticsearch Sink 连接器是否支持这些类型的批量操作?如果是这样,我需要什么配置,或者我可以查看任何示例代码吗?
【问题讨论】:
【参考方案1】:内部是 Elasticsearch sink 连接器creates a bulk processor,用于批量发送记录。要控制此处理器,您需要配置以下属性:
batch.size
:写入 Elasticsearch 时要作为批处理的记录数。
max.in.flight.requests
:在阻止更多请求之前,可以发送到 Elasticsearch 的最大索引请求数。
max.buffered.records
:在阻止接受更多记录之前,每个任务将缓冲的最大记录数。此配置可用于限制每个任务的内存使用量。
linger.ms
:根据batch.size
配置,在请求传输之间到达的记录被批处理到单个批量索引请求中。通常,这仅在记录到达速度快于发送速度时才会在负载下发生。然而,即使在轻负载下也可能需要减少请求的数量并从批量索引中受益。此设置有助于实现这一点 - 当待处理的批次未满时,任务将等待给定的延迟时间,以允许添加其他记录,以便将它们批处理到单个请求中,而不是立即将其发送出去。
flush.timeout.ms
:用于定期刷新的超时时间(以毫秒为单位),以及在添加记录时等待已完成请求提供缓冲区空间时的超时时间。如果超过此超时,任务将失败。
【讨论】:
以上是关于用于批量操作的 Kafka Elasticsearch 连接器的主要内容,如果未能解决你的问题,请参考以下文章
KAFKA SINK CONNECT: WARN 批量请求 167 失败。重试请求
kafka学习:kafka简单命令操作&springboot+kafka