KAFKA SINK CONNECT: WARN 批量请求 167 失败。重试请求
Posted
技术标签:
【中文标题】KAFKA SINK CONNECT: WARN 批量请求 167 失败。重试请求【英文标题】:KAFKA SINK CONNECT: WARN Bulk request 167 failed. Retrying request 【发布时间】:2021-12-08 06:14:10 【问题描述】:我有一个数据处理,其中输入主题、Kafka 流和输出主题连接到 Elasticsearch 的接收器连接。
在这个操作开始时,数据摄取是令人满意的,但是当进程运行了更长的时间时,从连接器的 Elasticsearch 摄取开始失败。 我一直在检查所有工人日志,我收到以下消息,我怀疑这可能是原因:
[2021-10-21 11:22:14,246] WARN Bulk request 168 failed. Retrying request. (io.confluent.connect.elasticsearch.ElasticsearchClient:335)
java.net.SocketTimeoutException: 3,000 milliseconds timeout on connection http-outgoing-643 [ACTIVE]
at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:387)
at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92)
at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39)
at org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175)
at org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:263)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:492)
at org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:213)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280)
at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
at java.base/java.lang.Thread.run(Thread.java:829)
[2021-10-21 11:27:23,858] INFO [Consumer clientId=connector-consumer-ElasticsearchSinkConnector-topic01-0, groupId=connect-ElasticsearchSinkConnector-topic01] Member connector-consumer-ElasticsearchSinkConnector-topic01-0-41b68d34-0f00-4887-b54e-79561fffb5e5 sending LeaveGroup request to coordinator kafka1:9092 (id: 2147483646 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:1042)
我已尝试更改连接器配置,但我不明白此问题的主要原因来修复它。
连接器配置:
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
connection.password=xxxxx
topics=output_topic
value.converter.schemas.enable=false
connection.username=user-x
name=ElasticsearchSinkConnector-output_topic
connection.url=xxxxxxx
value.converter=org.apache.kafka.connect.json.JsonConverter
key.ignore=true
key.converter=org.apache.kafka.connect.storage.StringConverter
schema.ignore=true
批量警告是否可能导致数据丢失? 任何帮助将不胜感激
【问题讨论】:
【参考方案1】:你可以试试添加
"flush.timeout.ms": 30000
【讨论】:
以上是关于KAFKA SINK CONNECT: WARN 批量请求 167 失败。重试请求的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Kafka Connect Sink 中指定 Kafka 主题的分区
Kafka-Connect:启动 S3 Sink 连接器时出现无法识别的错误
Kafka-Connect Cassandra Sink 连接器不将数据推送到 Cassandra