Kafka Connect - 无法刷新,等待生产者刷新未完成的消息时超时
Posted
技术标签:
【中文标题】Kafka Connect - 无法刷新,等待生产者刷新未完成的消息时超时【英文标题】:Kafka Connect - Failed to flush, timed out while waiting for producer to flush outstanding messages 【发布时间】:2019-08-26 12:52:36 【问题描述】:我正在尝试在 BULK 模式下使用具有以下属性的 Kafka Connect JDBC 源连接器。
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
timestamp.column.name=timestamp
connection.password=XXXXX
validate.non.null=false
tasks.max=1
producer.buffer.memory=2097152
batch.size=1000
producer.enable.idempotence=true
offset.flush.timeout.ms=300000
table.types=TABLE,VIEW
table.whitelist=materials
offset.flush.interval.ms=5000
mode=bulk
topic.prefix=mysql-
connection.user=kafka_connect_user
poll.interval.ms=200000
connection.url=jdbc:mysql://<DBNAME>
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
我收到以下关于提交偏移量的错误,更改各种参数似乎效果不大。
[2019-04-04 12:42:14,886] INFO WorkerSourceTaskid=SapMaterialsConnector-0 flushing 4064 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2019-04-04 12:42:19,886] ERROR WorkerSourceTaskid=SapMaterialsConnector-0 Failed to flush, timed out while waiting for producer to flush outstanding 712 messages (org.apache.kafka.connect.runtime.WorkerSourceTask)
【问题讨论】:
您如何运行 Confluent Platform?在你的笔记本电脑上?您看到的消息表明向代理发送消息存在问题。例如,如果它被重载。 Kafka 是在 Azure 上的 HDInsight 上作为托管服务运行的。集群有三个代理。 “批量”数据的大小小于 20 MB。有什么好的指南可以检查集群是否过载? 【参考方案1】:该错误表示有大量消息缓冲,在达到超时之前无法刷新。
要解决这个问题,你可以
在您的 Kafka Connect Worker Configs 中增加offset.flush.timeout.ms
配置参数
或者您可以通过减少 Kafka Connect Worker 配置中的 producer.buffer.memory
来减少缓冲的数据量。当您有相当大的消息时,这将成为最佳选择。
【讨论】:
producer.buffer.memory=2097152 offset.flush.timeout.ms=300000 所以2兆缓冲区和5分钟超时,这些真的是不合适的值吗?我不能在 5 分钟内发送 2 兆字节? 我面临同样的问题,我已经尝试了上述答案,但它对我没有帮助。我在这里提到了有关我的问题的完整详细信息:github.com/confluentinc/kafka-connect-jdbc/issues/… 请提供任何 cmets 你知道这个属性是如何为 kafka connect worker 调用的吗?我们的融合云实例在“producer.override.offset.flush.timeout.ms”和“offset.flush.timeout.ms”下无法识别。【参考方案2】:启用security.protocol=SSL
时,请确保 Connect 工作人员和 Connect 生产者有单独的 SSL 参数。
为两者提供 SSL 设置
# Authentication settings for Connect workers
ssl.keystore.location=/var/private/ssl/kafka.worker.keystore.jks
ssl.keystore.password=worker1234
ssl.key.password=worker1234
# Authentication settings for Connect producers used with source connectors
producer.ssl.keystore.location=/var/private/ssl/kafka.source.keystore.jks
producer.ssl.keystore.password=connector1234
producer.ssl.key.password=connector1234
见https://docs.confluent.io/5.2.3/connect/security.html#separate-principals
【讨论】:
【参考方案3】:如果您尝试连接融合云,则此错误可能是因为工作人员属性中缺少配置,请确保您添加了生产者和消费者配置。
consumer.ssl.endpoint.identification.algorithm=https
consumer.sasl.mechanism=PLAIN
consumer.request.timeout.ms=20000
consumer.retry.backoff.ms=500
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="API_KEY" password="SECRET";
consumer.security.protocol=SASL_SSL
producer.ssl.endpoint.identification.algorithm=https
producer.sasl.mechanism=PLAIN
producer.request.timeout.ms=20000
producer.retry.backoff.ms=500
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="API_KEY" password="SECRET";
producer.security.protocol=SASL_SSL
【讨论】:
【参考方案4】:我不知道这是否对某人有帮助。我在使用 Oracle Connector CDC 时遇到了同样的错误,错误是因为该表没有主键。我添加了主键,效果很好。
【讨论】:
以上是关于Kafka Connect - 无法刷新,等待生产者刷新未完成的消息时超时的主要内容,如果未能解决你的问题,请参考以下文章
将 kafka-connect-transform-archive 与 HdfsSinkConnector 一起使用时的刷新大小
无法在 kafka connect docker 映像中运行 kafka connect datagen