kafka常用生产者消费者配置
Posted liclblog
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka常用生产者消费者配置相关的知识,希望对你有一定的参考价值。
属性名 | 默认值 | 描述 |
message.send.max.retries | 3 |
设置当生产者向代理发信息时,若代理由于各种原因导致接 受失败,生产者在丢弃该消息前进行重试的次数。 |
retry.backoff.ms | 100 |
在生产者每次重试之前,生产者会更新主题的 MetaData 信息, 以此来检测新的 Lea er 是否己选举出来。因为选举 Leader需要一定时间 ,所以此选项指定更新主题的 MetaData 之前生产 者需要等待的时间,单位为 ms。 |
queue.buffering.max.ms | 1000 |
在异步模式下 表示消息被缓存的最长时间,单位为 ms ,当 到达该时间后消息将开始批量发送,若在异步模式下同时配 置了缓存数据的最大值 batch num.messages ,则达到这两个阙值之一都将开始批量发送消息。 |
queue.buffering.max.messages | 10000 |
在异步模式下,在生产者必须被阻塞或者数据必须丢失之前, 可以缓存到队列中的未发送的最大消息条数,即初始化消息 队列的长度。 |
batch.num.messages | 200 | 在异步模式下每次批量发送消息的最大消息数 |
request. timeout.ms | 1500 | 当需要 acks 时, 生产者等待代理应答的超时时间,单位为 ms 若在该时间范围内还没有收到应答,则会发送错误到客户端。 |
send.buffer.bytes | 100kb | Socket 发送缓冲区大小 |
topic.metadata.refresh.interval.ms | 5min | 生产者定时请求更新主题元数据的时间间隔。若设置为 0,则 在每个消息发送后都去请求更新数据 |
client.id | console-producer | 生产者指定的一个标识字段,在每次请求中包含该字段,用来追 踪调用,根据该字段在逻辑上可以确认是哪个应用发出的请求 |
queue.enqueue.timeout.ms | 2147483647 | 该值为0表示当队列没满时直接入队 满了则立即丢弃,负数表示无条件阻塞且不丢弃,正数表示阻塞达到该值时长后 抛出 QueueFul!Exception 异常 |
属姓名 | 默认值 | 描述 |
group.id | / | 消费组 id ,新版本消费者必须由客户端指定 |
client.id | / | KafkaConsumer 对应的客户端 id ,客户端可以不指定, Kafka 会自 动生成一个 clientld 字符串 |
key.deserializer | / | 消息的 Key 反序列化类,需要实现 org.apache.ka fka.common.seria lization.Deserializer 接口 |
value.deserializer | / | 消息的 Value 反序列化类,需要实现 org.apache.ka f ka.common. serialization.Deserializer 接口 |
enable.auto.commit | true | 是否开启自动提交消费偏移量 |
max.poll.records | 500 | 一次拉取消息的最大数量 |
max.poll.interval.ms | 300000 ms | 当通过消费组管理消费者时,该配置指定拉取消息线程最长空闲时间,若超过这个时间间隔还没有发起 poll 操作,则消费组认为 该消费者己离开了消费组,将进行平衡操作 |
send.buffer.bytes | 128 KB | Socket 发送消息缓冲区大小 |
receive.buffer.bytes | 64 KB | Socket 接收消息缓冲区大小 |
fetch.min.bytes | 1 | 一次拉取操作等待消息的最小字节数 |
fetch.max.bytes | 50 MB | 一次拉取操作获取消息的最大字节数 |
session.timeout.ms | 10000 ms | 与ZooKeeper 会话超时时间,当通过消费组管理消费者时,如果 在该配置的时间内组协调器没有收到消费者发来的心跳请求,则 协调器会将该消费者从消费组中移除 |
request.timeout.ms | 305000 ms | 客户端发送请求后等待回应的超时时间 |
heartbeat.interval.ms | 3000 ms | 发送心跳请求的时间间隔 |
auto.commit.interval.ms | 5000 ms | 自动提交消费偏移量的时间间隔 |
fetch.max.wait.ms | 500 ms | 若是不满足 fetch.min )吃es 时,客户端等待请求的最长等待时间 |
ConsumerConfig :消费者级别的配置,将相应配置传递给其他组件。
SubscriptionState :维护了消费者订阅和消费消息的情况 该类定义了 系列用于保存 订阅信息的字段,主要宇段描述如下。
subscription :用来保存客户端通过 KafkaConsumer subscribeO方法所订阅的主题列表
subscribedPattem :用来保存通过模式匹配订阅主题的模式。
user Assignment:用来保存客户端通过 KafkaConsumer.assignO方法所订阅的分区列表
groupSubscription :用来保存该消费者当前订阅的主题列表
assignment:用来保存消费者对所订阅的每个主题分区的消费情况
SubscriptionSjate 类内部定义了一个私有的枚举类型 SubscriptionType ,该枚举类定义了
消费者订阅消息的四种模式 其中
NONE表示初始状态还没有订阅任何主题,
AUTO TOPICS 表示按主题名订阅且由指定的分区分配策略自动进行分区与消费者的 映射,
AUTO PATTERN 表示以正则表达式形式指定消费的主题 ,分区分配方式与 AUTO TOPICS 模式相同,
USER ASSIGNED 表示客户端指定了消费者消费的分区。
以上是关于kafka常用生产者消费者配置的主要内容,如果未能解决你的问题,请参考以下文章