spring kafka 参数说明

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spring kafka 参数说明相关的知识,希望对你有一定的参考价值。

参考技术A # kafka

spring.kafka.bootstrap-servers=10.125.70.41:9092,10.125.70.35:9092,10.125.70.36:9092

#client-id

spring.kafka.client-id=group1

生产者参数

# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。

# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。

# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应

spring.kafka.producer.acks=1

#当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。

spring.kafka.producer.batch-size=16384

# 发生错误后,消息重发的次数。

spring.kafka.producer.retries=3

# 设置生产者内存缓冲区的大小。

spring.kafka.producer.buffer-memory=33554432

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer

spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

消费者参数

# 自动提交的时间间隔

spring.kafka.consumer.auto-commit-interval=1000

# offset的消费位置

spring.kafka.consumer.auto-offset-reset=latest

# 是否自动提交

spring.kafka.consumer.enable-auto-commit=false

# 最大拉取间隔时间

spring.kafka.consumer.max.poll.interval.ms =600000

# 会话超时时间

spring.kafka.consumer.session.timeout.ms =10000

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer

spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

# 消费组名称

spring.kafka.consumer.groupId=dmsdecision

# 最大拉取条数

spring.kafka.consumer.max-poll-records=30

# 心跳时间

spring.kafka.consumer.heartbeat-interval=3000

# kafka spring.kafka.properties.parsefileContainerFactory_concurrency监听线程数未设置时,本参数生效

spring.kafka.listener.concurrency=30

#MANUAL 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交

#MANUAL_IMMEDIATE 手动调用Acknowledgment.acknowledge()后立即提交

#RECORD 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交

#BATCH 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交

#TIME 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交

#COUNT 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交

#COUNT_TIME TIME或COUNT 有一个条件满足时提交

# ack_mode为COUNT/COUNT_TIME 时配置

spring.kafka.listener.ack-mode=manual_immediate

# ack_mode为COUNT/COUNT_TIME 时配置

spring.kafka.listener.ack-count=

# ack_mode为/COUNT_TIME 时配置

spring.kafka.listener.ack-time=

# poll拉取数据超时时间

spring.kafka.listener.poll-timeout=

Spring Kafka 再平衡说明

【中文标题】Spring Kafka 再平衡说明【英文标题】:Spring Kafka rebalance clarification 【发布时间】:2019-02-27 08:56:57 【问题描述】:

我只是想了解kafka rebalance。这是我的监听器方法。我已将 RetryTemplate 与消费者工厂配置为重试 20 次,回退延迟为 20 秒。我正在使用 spring-kafka 1.2.2(我们计划升级客户端)并使用手动确认。

@KafkaListener(id = "$kafka.listener-id", topics = "$kafka.topic")
public void listen(final ConsumerRecord<String, String> consumerRecord,
                   final Acknowledgment acknowledgment) throws ServiceResponseException 

    if (true)
        System.out.println("throwing exception ");
        throw new RuntimeException();
    

    try 
        acknowledgment.acknowledge();
        LOGGER.info("Kafka acknowledgment sent for Transaction ID:");
     catch (Exception e) 
        LOGGER.info("Exception encountered when acking record with transaction id: ");
     

我有 2 个工人,每个工人有 2 个并发。在 kafka 上,我有 3 个分区。我启动了一个工人,并将 3 个分区分配给工人 1。然后我发了一条信息。并且在侦听器中抛出了一个 RuntimeException,这种情况发生了 20 次,延迟时间为 20 秒。然后,当我启动 worker2 kafka 重新平衡触发器但尚未分配分区时。 worker1 失败并显示消息“处理时出错:ConsumerRecord”(在 getContainerProperties().getShutdownTimeout() 之后),然后所有消费者都加入了该组。现在同样的消息被传递给worker2。

1) 这可以正常工作,因为我需要它来工作。但是我有一个问题,当重新平衡触发为什么分区分配没有立即发生时,它会等待 worker1 完全停止(等待 getContainerProperties().getShutdownTimeout()),然后 worker1 和 worker2 的所有消费者都加入组。

2) 在重新平衡期间,我观察到消费者停止调用 poll(来自下面的日志)。是真的吗?

工人 1 的跟踪日志:

2018-09-23 13:52:53.259 TRACE 6384 --- [ listener-2-L-1] essageListenerContainer$ListenerConsumer : No records to process
2018-09-23 13:52:53.259 TRACE 6384 --- [ listener-0-L-1] essageListenerContainer$ListenerConsumer : No records to process
2018-09-23 13:52:53.384 DEBUG 6384 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2018-09-23 13:52:53.384 TRACE 6384 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Polling (paused=false)...
2018-09-23 13:52:53.977 DEBUG 6384 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2018-09-23 13:52:53.977 TRACE 6384 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Polling (paused=false)...
2018-09-23 13:52:54.008 DEBUG 6384 --- [ listener-2-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2018-09-23 13:52:54.008 TRACE 6384 --- [ listener-2-C-1] essageListenerContainer$ListenerConsumer : Polling (paused=false)...
2018-09-23 13:52:54.023  INFO 6384 --- [ listener-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Revoking previously assigned partitions [messages-0] for group mris-group
2018-09-23 13:52:54.023 TRACE 6384 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Received partition revocation notification, and will stop the invoker.
2018-09-23 13:52:54.023 DEBUG 6384 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Stopping invoker
2018-09-23 13:52:54.081  INFO 6384 --- [ listener-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Revoking previously assigned partitions [messages-1] for group mris-group
2018-09-23 13:52:54.081 TRACE 6384 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Received partition revocation notification, and will stop the invoker.
2018-09-23 13:52:54.081 DEBUG 6384 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Stopping invoker
2018-09-23 13:52:54.241  INFO 6384 --- [ listener-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Revoking previously assigned partitions [messages-2] for group mris-group
2018-09-23 13:52:54.241 TRACE 6384 --- [ listener-2-C-1] essageListenerContainer$ListenerConsumer : Received partition revocation notification, and will stop the invoker.
2018-09-23 13:52:54.241 DEBUG 6384 --- [ listener-2-C-1] essageListenerContainer$ListenerConsumer : Stopping invoker
2018-09-23 13:52:54.264 DEBUG 6384 --- [ listener-2-C-1] essageListenerContainer$ListenerConsumer : Invoker stopped
2018-09-23 13:52:54.264 DEBUG 6384 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Invoker stopped
2018-09-23 13:52:54.264  INFO 6384 --- [ listener-2-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[messages-2]
2018-09-23 13:52:54.264  INFO 6384 --- [ listener-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[messages-0]
2018-09-23 13:52:54.264  INFO 6384 --- [ listener-2-C-1] o.a.k.c.c.internals.AbstractCoordinator  : (Re-)joining group mris-group
2018-09-23 13:52:54.265  INFO 6384 --- [ listener-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : (Re-)joining group mris-group
2018-09-23 13:53:09.355 DEBUG 6384 --- [ listener-1-L-1] .a.RecordMessagingMessageListenerAdapter : Processing [GenericMessage [payload=<removed>]
throwing exception
2018-09-23 13:53:24.083 DEBUG 6384 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Interrupting invoker
2018-09-23 13:53:24.083 DEBUG 6384 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Invoker stopped
2018-09-23 13:53:24.085  INFO 6384 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Invoker timed out while waiting for shutdown and will be canceled.
2018-09-23 13:53:24.085  INFO 6384 --- [ listener-1-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[messages-1]
2018-09-23 13:53:24.085  INFO 6384 --- [ listener-1-C-1] o.a.k.c.c.internals.AbstractCoordinator  : (Re-)joining group mris-group
2018-09-23 13:53:24.101 ERROR 6384 --- [ listener-1-L-1] o.s.kafka.listener.LoggingErrorHandler   : Error while processing: ConsumerRecord(topic = messages, partition = 1, offset = 0, CreateTime = 1537725149052, checksum = 3567644394, serialized key size = 27, serialized value size = 1952, key = test_hotfix1@test.com, value = <removed>])

org.springframework.retry.backoff.BackOffInterruptedException: Thread interrupted while sleeping; nested exception is java.lang.InterruptedException: sleep interrupted
    at org.springframework.retry.backoff.FixedBackOffPolicy.doBackOff(FixedBackOffPolicy.java:86) ~[spring-retry-1.2.0.RELEASE.jar:na]
    at org.springframework.retry.backoff.StatelessBackOffPolicy.backOff(StatelessBackOffPolicy.java:36) ~[spring-retry-1.2.0.RELEASE.jar:na]
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:305) ~[spring-retry-1.2.0.RELEASE.jar:na]
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:179) ~[spring-retry-1.2.0.RELEASE.jar:na]
    at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter.onMessage(RetryingAcknowledgingMessageListenerAdapter.java:73) ~[spring-kafka-1.2.2.RELEASE.jar:na]
    at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter.onMessage(RetryingAcknowledgingMessageListenerAdapter.java:39) ~[spring-kafka-1.2.2.RELEASE.jar:na]
    at org.springframework.kafka.listener.adapter.FilteringAcknowledgingMessageListenerAdapter.onMessage(FilteringAcknowledgingMessageListenerAdapter.java:55) ~[spring-kafka-1.2.2.RELEASE.jar:na]
    at org.springframework.kafka.listener.adapter.FilteringAcknowledgingMessageListenerAdapter.onMessage(FilteringAcknowledgingMessageListenerAdapter.java:34) ~[spring-kafka-1.2.2.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:794) [spring-kafka-1.2.2.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:738) [spring-kafka-1.2.2.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$2200(KafkaMessageListenerContainer.java:245) [spring-kafka-1.2.2.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerInvoker.run(KafkaMessageListenerContainer.java:1031) [spring-kafka-1.2.2.RELEASE.jar:na]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_162]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_162]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_162]
Caused by: java.lang.InterruptedException: sleep interrupted
    at java.lang.Thread.sleep(Native Method) [na:1.8.0_162]
    at org.springframework.retry.backoff.ThreadWaitSleeper.sleep(ThreadWaitSleeper.java:30) ~[spring-retry-1.2.0.RELEASE.jar:na]
    at org.springframework.retry.backoff.FixedBackOffPolicy.doBackOff(FixedBackOffPolicy.java:83) ~[spring-retry-1.2.0.RELEASE.jar:na]
    ... 14 common frames omitted

2018-09-23 13:53:24.101  INFO 6384 --- [ listener-1-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Successfully joined group mris-group with generation 10
2018-09-23 13:53:24.101  INFO 6384 --- [ listener-2-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Successfully joined group mris-group with generation 10
2018-09-23 13:53:24.102  INFO 6384 --- [ listener-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Successfully joined group mris-group with generation 10
2018-09-23 13:53:24.102  INFO 6384 --- [ listener-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Setting newly assigned partitions [messages-0] for group mris-group
2018-09-23 13:53:24.102  INFO 6384 --- [ listener-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Setting newly assigned partitions [messages-2] for group mris-group
2018-09-23 13:53:24.102  INFO 6384 --- [ listener-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Setting newly assigned partitions [] for group mris-group
2018-09-23 13:53:24.103 DEBUG 6384 --- [ listener-2-C-1] essageListenerContainer$ListenerConsumer : Committing: 
2018-09-23 13:53:24.103  INFO 6384 --- [ listener-2-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[]
2018-09-23 13:53:24.103 DEBUG 6384 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Committing: messages-0=OffsetAndMetadataoffset=0, metadata=''
2018-09-23 13:53:24.104 DEBUG 6384 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Committing: messages-2=OffsetAndMetadataoffset=0, metadata=''
2018-09-23 13:53:24.106  INFO 6384 --- [ listener-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[messages-0]
2018-09-23 13:53:24.107 DEBUG 6384 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2018-09-23 13:53:24.107 TRACE 6384 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Polling (paused=false)...
2018-09-23 13:53:24.108  INFO 6384 --- [ listener-1-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[messages-2]
2018-09-23 13:53:24.108 DEBUG 6384 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2018-09-23 13:53:24.108 TRACE 6384 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Polling (paused=false)...
2018-09-23 13:53:24.207 DEBUG 6384 --- [ listener-2-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2018-09-23 13:53:24.207 TRACE 6384 --- [ listener-2-C-1] essageListenerContainer$ListenerConsumer : Polling (paused=false)...
2018-09-23 13:53:25.111 TRACE 6384 --- [ listener-0-L-2] essageListenerContainer$ListenerConsumer : No records to process

来自 worker2 的跟踪日志:

2018-09-23 13:53:24.102  INFO 6401 --- [ listener-2-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Successfully joined group mris-group with generation 10
2018-09-23 13:53:24.104  INFO 6401 --- [ listener-1-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Successfully joined group mris-group with generation 10
2018-09-23 13:53:24.105  INFO 6401 --- [ listener-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Setting newly assigned partitions [] for group mris-group
2018-09-23 13:53:24.105  INFO 6401 --- [ listener-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Setting newly assigned partitions [] for group mris-group
2018-09-23 13:53:24.105 DEBUG 6401 --- [ listener-2-C-1] essageListenerContainer$ListenerConsumer : Committing: 
2018-09-23 13:53:24.105 DEBUG 6401 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Committing: 
2018-09-23 13:53:24.105  INFO 6401 --- [ listener-2-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[]
2018-09-23 13:53:24.105  INFO 6401 --- [ listener-1-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[]
2018-09-23 13:53:24.106  INFO 6401 --- [ listener-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Setting newly assigned partitions [messages-1] for group mris-group
2018-09-23 13:53:24.111 DEBUG 6401 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Committing: messages-1=OffsetAndMetadataoffset=0, metadata=''
2018-09-23 13:53:24.115  INFO 6401 --- [ listener-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[messages-1]
2018-09-23 13:53:24.118 DEBUG 6401 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2018-09-23 13:53:24.118 TRACE 6401 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Polling (paused=false)...
2018-09-23 13:53:24.189 DEBUG 6401 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Received: 1 records
2018-09-23 13:53:24.189 TRACE 6401 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Polling (paused=false)...
2018-09-23 13:53:24.202 TRACE 6401 --- [ listener-0-L-1] essageListenerContainer$ListenerConsumer : Processing ConsumerRecord(topic = messages, partition = 1, offset = 0, CreateTime = 1537725149052, checksum = 3567644394, serialized key size = 27, serialized value size = 1952, key = test_hotfix1@test.com, value = <removed>)
2018-09-23 13:53:24.209 DEBUG 6401 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2018-09-23 13:53:24.209 DEBUG 6401 --- [ listener-2-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2018-09-23 13:53:24.209 TRACE 6401 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Polling (paused=false)...
2018-09-23 13:53:24.210 TRACE 6401 --- [ listener-2-C-1] essageListenerContainer$ListenerConsumer : Polling (paused=false)...
2018-09-23 13:53:24.216 DEBUG 6401 --- [ listener-0-L-1] .a.RecordMessagingMessageListenerAdapter : Processing [GenericMessage [payload=<removed>]
throwing exception 
2018-09-23 13:53:25.194 DEBUG 6401 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2018-09-23 13:53:25.194 TRACE 6401 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Polling (paused=false)...

【问题讨论】:

【参考方案1】:

1.3 之前的版本有一个非常复杂的线程模型,以避免由于缓慢的侦听器而重新平衡。 KIP-62 使我们能够在 1.3 及更高版本中使用更简单的线程模型。

1.2.x 不再受支持,我没有时间(或不愿意)回去弄清楚发生了什么。请升级到 1.3.7(甚至更好,2.1.10)。

【讨论】:

谢谢加里。我想我已经找到了发生这种情况的原因。根据 ConsumerRebalanceListener 上的 javadoc:“保证所有消费者进程将在任何进程调用 onPartitionsAssigned 之前调用 onPartitionsRevoked。因此,如果在 onPartitionsRevoked 调用中保存了偏移量或其他状态,则保证在进程接管该调用时保存它分区调用了他们的 onPartitionsAssigned 回调来加载状态。”所以我认为这是未分配分区的原因,因为 worker1 仍在执行 onPartitionsRevoked 加里你认为这对新版本的 spring-kafka(>2.1.0) 是否同样有效?

以上是关于spring kafka 参数说明的主要内容,如果未能解决你的问题,请参考以下文章

kafka客户端参数说明(kafka-client 2.4版本):

kafka相关配置参数说明

Kafka生产者各种启动参数说明

kafka之config/server.properties配置参数说明

Flink Connectors之消费Kafka数据相关参数以及API说明

Kafka集群配置说明