用于批量操作的 Kafka Elasticsearch 连接器

Posted

技术标签:

【中文标题】用于批量操作的 Kafka Elasticsearch 连接器【英文标题】:Kafka Elasticsearch Connector for bulk operations 【发布时间】:2021-03-26 12:18:10 【问题描述】:

我正在使用 Elasticsearch Sink 连接器对单个记录进行操作(索引、更新、删除)。

Elasticsearch 还有一个 /_bulk 端点,可用于一次创建、更新、索引或删除多条记录。文档here。

Elasticsearch Sink 连接器是否支持这些类型的批量操作?如果是这样,我需要什么配置,或者我可以查看任何示例代码吗?

【问题讨论】:

【参考方案1】:

内部是 Elasticsearch sink 连接器creates a bulk processor,用于批量发送记录。要控制此处理器,您需要配置以下属性:

batch.size:写入 Elasticsearch 时要作为批处理的记录数。 max.in.flight.requests:在阻止更多请求之前,可以发送到 Elasticsearch 的最大索引请求数。 max.buffered.records:在阻止接受更多记录之前,每个任务将缓冲的最大记录数。此配置可用于限制每个任务的内存使用量。 linger.ms:根据batch.size 配置,在请求传输之间到达的记录被批处理到单个批量索引请求中。通常,这仅在记录到达速度快于发送速度时才会在负载下发生。然而,即使在轻负载下也可能需要减少请求的数量并从批量索引中受益。此设置有助于实现这一点 - 当待处理的批次未满时,任务将等待给定的延迟时间,以允许添加其他记录,以便将它们批处理到单个请求中,而不是立即将其发送出去。 flush.timeout.ms:用于定期刷新的超时时间(以毫秒为单位),以及在添加记录时等待已完成请求提供缓冲区空间时的超时时间。如果超过此超时,任务将失败。

【讨论】:

以上是关于用于批量操作的 Kafka Elasticsearch 连接器的主要内容,如果未能解决你的问题,请参考以下文章

KAFKA SINK CONNECT: WARN 批量请求 167 失败。重试请求

Kafka数据丢失分析

kafka的关键特征

kafka学习:kafka简单命令操作&springboot+kafka

Kafka Connect with MSSQL 不适用于删除操作

Kafka Connect - 不适用于更新操作