spring-kafka —— 生产者消费者重要配置
Posted caoweixiong
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spring-kafka —— 生产者消费者重要配置相关的知识,希望对你有一定的参考价值。
一、生产者配置
# 以逗号分隔的主机:端口对列表,用于建立与Kafka群集的初始连接 spring.kafka.producer.bootstrap-servers=TopKafka1:9092,TopKafka2:9092,TopKafka3:9092 # 设置大于0的值将使客户端重新发送任何数据,一旦这些数据发送失败。注意,这些重试与客户端接收到发送错误时的重试没有什么不同。允许重试将潜在的改变数据的顺序,如果这两个消息记录都是发送到同一个partition,则第一个消息失败第二个发送成功,则第二条消息会比第一条消息出现要早。 spring.kafka.producer.retries=0 # 每当多个记录被发送到同一分区时,生产者将尝试将记录一起批量处理为更少的请求, # 这有助于提升客户端和服务端之间的性能,此配置控制默认批量大小(以字节为单位),默认值为16384 spring.kafka.producer.batch-size=16384 # producer可以用来缓存数据的内存大小。如果数据产生速度大于向broker发送的速度,producer会阻塞或者抛出异常,以“block.on.buffer.full”来表明。这项设置将和producer能够使用的总内存相关,但并不是一个硬性的限制,因为不是producer使用的所有内存都是用于缓存。一些额外的内存会用于压缩(如果引入压缩机制),同样还有一些用于维护请求。 spring.kafka.producer.buffer-memory=33554432 # key的Serializer类,实现了org.apache.kafka.common.serialization.Serializer接口 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer # 值的Serializer类,实现了org.apache.kafka.common.serialization.Serializer接口 spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer # procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下: # acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。 # acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。 # acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。 # 可以设置的值为:all, -1, 0, 1 spring.kafka.producer.acks=-1 # 当向server发出请求时,这个字符串会发送给server。目的是能够追踪请求源头,以此来允许ip/port许可列表之外的一些应用可以发送信息。这项应用可以设置任意字符串,因为没有任何功能性的目的,除了记录和跟踪 spring.kafka.producer.client-id=1 # producer用于压缩数据的压缩类型。默认是无压缩。正确的选项值是none、gzip、snappy。压缩最好用于批量处理,批量处理消息越多,压缩性能越好 spring.kafka.producer.compression-type=snappy
二、消费者配置
# 以逗号分隔的主机:端口对列表,用于建立与Kafka群集的初始连接 spring.kafka.consumer.bootstrap-servers=TopKafka1:9092,TopKafka2:9092,TopKafka3:9092 # 用来唯一标识consumer进程所在组的字符串,如果设置同样的group id,表示这些processes都是属于同一个consumer group,默认:"" spring.kafka.consumer.group-id=TyyLoveZyy # max.poll.records条数据需要在session.timeout.ms这个时间内处理完,默认:500 spring.kafka.consumer.max-poll-records=500 # 消费超时时间,大小不能超过session.timeout.ms,默认:3000 spring.kafka.consumer.heartbeat-interval=3000 # 如果为真,consumer所fetch的消息的offset将会自动的同步到zookeeper。这项提交的offset将在进程挂掉时,由新的consumer使用,默认:true spring.kafka.consumer.enable-auto-commit=true # consumer自动向zookeeper提交offset的频率,默认:5000 spring.kafka.consumer.auto-commit-interval=5000 # 没有初始化的offset时,可以设置以下三种情况:(默认:latest) # earliest # 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 # latest # 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 # none # topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常 spring.kafka.consumer.auto-offset-reset=earliest # 每次fetch请求时,server应该返回的最小字节数。如果没有足够的数据返回,请求会等待,直到足够的数据才会返回。默认:1 spring.kafka.consumer.fetch-min-size=1 # Fetch请求发给broker后,在broker中可能会被阻塞的(当topic中records的总size小于fetch.min.bytes时),此时这个fetch请求耗时就会比较长。这个配置就是来配置consumer最多等待response多久。 spring.kafka.consumer.fetch-max-wait=500 # 消费者进程的标识。如果设置一个人为可读的值,跟踪问题会比较方便。。默认:"" spring.kafka.consumer.client-id=1 # key的反序列化类。实现了org.apache.kafka.common.serialization.Deserializer接口 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer # 值的反序列化类。实现了org.apache.kafka.common.serialization.Deserializer接口 spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
三、其它重要配置
# consumer是通过拉取的方式向服务端拉取数据,当超过指定时间间隔max.poll.interval.ms没有向服务端发送poll()请求,而心跳heartbeat线程仍然在继续,会认为该consumer锁死,就会将该consumer退出group,并进行再分配。默认:300000 spring.kafka.consumer.properties.max.poll.interval.ms=300000 # 会话的超时限制。如果consumer在这段时间内没有发送心跳信息,则它会被认为挂掉了,并且reblance将会产生,必须在[group.min.session.timeout.ms, group.max.session.timeout.ms]范围内。默认:10000 spring.kafka.consumer.properties.session.timeout.ms=10000
以上是关于spring-kafka —— 生产者消费者重要配置的主要内容,如果未能解决你的问题,请参考以下文章
Spring-Kafka:反序列化 kafka 消息时出现问题 - 类不在“受信任的包”中?
使用 Spring-kafka 在 GC/消费者重新平衡时清理 Kafka Metric 计量器
spring-kafka整合:DefaultKafkaProducerFactory默认kafka生产者工厂介绍