Kafka生产者默认配置消费者默认配置说明

Posted 顧棟

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka生产者默认配置消费者默认配置说明相关的知识,希望对你有一定的参考价值。

Kafka生产者默认配置

参数名默认值参数说明
retries0设置大于零的值将导致客户端重新发送任何发送失败并可能出现暂时性错误的记录。
acks1在认为请求完成之前,生产者要求领导者收到的确认数。
acks=0,如果设置为零,那么生产者根本不会等待来自服务器的任何确认。
acks=1,只要分区的Leader副本写入成功就有会返回。
acks=-1acks=all,需要所有副本都写入完毕。
compression.typenone生产者进行消息压缩配置,none不压缩,值gzipsnappylz4。压缩是对整批数据进行的,所以batching的效率也会影响压缩率(更多batching意味着更好的压缩)。
batch.size16384(16KB)BatchRecords的大小
linger.ms0在达不到批次消息大小的时候 可以调整等待周期,单位ms
client.id“ ”客户端id
send.buffer.bytes128 * 1024 (128KB)发送数据时使用的 TCP 发送缓冲区 (SO_SNDBUF) 的大小。 如果值为 -1,则将使用操作系统默认值。单位B
receive.buffer.bytes32 * 1024 (32KB)读取数据时使用的 TCP 接收缓冲区 (SO_RCVBUF) 的大小。 如果值为 -1,则将使用操作系统默认值。单位B
max.request.size1 * 1024 * 1024 (1MB)请求的最大大小(以字节为单位)。 此设置将限制生产者将在单个请求中发送的记录批次数,以避免发送大量请求。 这实际上也是最大记录批量大小的上限。 请注意,服务器有自己的记录批量大小上限,可能与此不同。单位B
reconnect.backoff.ms50L在尝试重新连接到给定主机之前等待的基本时间。 这避免了在紧密循环中重复连接到主机。 此退避适用于客户端到代理的所有连接尝试。单位ms
reconnect.backoff.max.ms1000L重新连接到反复连接失败的代理时要等待的最长时间(以毫秒为单位)。 如果提供,则每个主机的退避将在每次连续连接失败时呈指数增加,直至达到此最大值。 计算回退增量后,添加 20% 的随机抖动以避免连接风暴。单位ms
retry.backoff.ms100L在尝试重试对给定主题分区的失败请求之前等待的时间。 这避免了在某些故障情况下在紧密循环中重复发送请求。单位ms
max.block.ms60s配置控制 KafkaProducer.send()KafkaProducer.partitionsFor() 将阻塞多长时间。 由于缓冲区已满或元数据不可用,这些方法可能会被阻止。 用户提供的序列化程序或分区程序中的阻塞不会计入此超时。单位ms
request.timeout.ms30s这应该大于replica.lag.time.max.ms(代理配置)以减少由于不必要的生产者重试而导致消息重复的可能性。
metadata.max.age.ms30s即使我们没有看到任何分区领导更改以主动发现任何新代理或分区,在这个时间内我们也会强制刷新元数据,单位ms。
metrics.sample.window.ms30s计算指标样本的时间窗口。
metrics.num.samples2为计算指标而维护的样本数。
metrics.log.level“0”指标输入日志级别,“0”:INFO,”1“:DEBUG
metric.reportersCollections.emptyList()用作指标报告器的类列表。 实现 org.apache.kafka.common.metrics.MetricsReporter 接口允许插入将在新指标创建时通知的类。 JmxReporter 总是包含在注册 JMX 统计信息中。
max.in.flight.requests.per.connection5客户端在阻塞前将在单个连接上发送的最大未确认请求数。 请注意,如果此设置设置为大于 1 且发送失败,则存在由于重试(即启用重试)而导致消息重新排序的风险。
connections.max.idle.ms9 * 60 * 1000在此配置指定的毫秒数后关闭空闲连接。默认值9分钟,单位ms。
partitioner.classDefaultPartitioner.class分区器。实现 org.apache.kafka.clients.producer.Partitioner 接口的 Partitioner 类。
interceptor.classesCollections.emptyList()用作拦截器的类列表。 实现 org.apache.kafka.clients.producer.ProducerInterceptor 接口允许您在将生产者收到的记录发布到 Kafka 集群之前拦截(并可能改变)它们。 默认情况下,没有拦截器。
security.protocolPLAINTEXT用于与代理通信的协议。 有效值为:Utils.join(SecurityProtocol.names(), ", ") 。
enable.idempotencefalse幂等是否开启,当设置为“true”时,生产者将确保在流中写入每条消息的一个副本。 如果为“false”,则生产者由于代理失败等而重试,可能会在流中写入重试消息的副本。 请注意,启用幂等性要求 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 小于或等于 5, RETRIES_CONFIG 大于 0 且ACKS_CONFIG必须为all。 如果用户未明确设置这些值,则会选择合适的值。 如果设置了不兼容的值,则会抛出 ConfigException。
transaction.timeout.ms60s事务超时时间,事务协调器在主动中止正在进行的事务之前等待来自生产者的事务状态更新的最长时间(以毫秒为单位)。 如果此值大于代理中的 transaction.max.timeout.ms 设置,则请求将失败并显示lnvalidTransactionTimeout错误。
transactional.idnull事务id,用于事务性交付的 TransactionalId。 这实现了跨越多个生产者会话的可靠性语义,因为它允许客户端保证使用相同 TransactionalId 的事务在开始任何新事务之前已经完成。 如果未提供 TransactionalId,则生产者仅限于幂等交付。 请注意,如果配置了 TransactionalId,则必须启用 enable.idempotence。 默认为 null,表示不能使用事务。 请注意,默认情况下,事务需要至少三个代理的集群,这是生产的推荐设置; 对于开发,您可以通过调整代理设置transaction.state.log.replication.factor 来改变这一点。

ProducerConfig类中static{}块。

static {
    CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Collections.emptyList(), new ConfigDef.NonNullValidator(), Importance.HIGH, CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
                            .define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC)
                            .define(RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), Importance.HIGH, RETRIES_DOC)
                            .define(ACKS_CONFIG,
                                    Type.STRING,
                                    "1",
                                    in("all", "-1", "0", "1"),
                                    Importance.HIGH,
                                    ACKS_DOC)
                            .define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", Importance.HIGH, COMPRESSION_TYPE_DOC)
                            .define(BATCH_SIZE_CONFIG, Type.INT, 16384, atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC)
                            .define(LINGER_MS_CONFIG, Type.LONG, 0, atLeast(0L), Importance.MEDIUM, LINGER_MS_DOC)
                            .define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.MEDIUM, CommonClientConfigs.CLIENT_ID_DOC)
                            .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(-1), Importance.MEDIUM, CommonClientConfigs.SEND_BUFFER_DOC)
                            .define(RECEIVE_BUFFER_CONFIG, Type.INT, 32 * 1024, atLeast(-1), Importance.MEDIUM, CommonClientConfigs.RECEIVE_BUFFER_DOC)
                            .define(MAX_REQUEST_SIZE_CONFIG,
                                    Type.INT,
                                    1 * 1024 * 1024,
                                    atLeast(0),
                                    Importance.MEDIUM,
                                    MAX_REQUEST_SIZE_DOC)
                            .define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 50L, atLeast(0L), Importance.LOW, CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC)
                            .define(RECONNECT_BACKOFF_MAX_MS_CONFIG, Type.LONG, 1000L, atLeast(0L), Importance.LOW, CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_DOC)
                            .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 100L, atLeast(0L), Importance.LOW, CommonClientConfigs.RETRY_BACKOFF_MS_DOC)
                            .define(MAX_BLOCK_MS_CONFIG,
                                    Type.LONG,
                                    60 * 1000,
                                    atLeast(0),
                                    Importance.MEDIUM,
                                    MAX_BLOCK_MS_DOC)
                            .define(REQUEST_TIMEOUT_MS_CONFIG,
                                    Type.INT,
                                    30 * 1000,
                                    atLeast(0),
                                    Importance.MEDIUM,
                                    REQUEST_TIMEOUT_MS_DOC)
                            .define(METADATA_MAX_AGE_CONFIG, Type.LONG, 5 * 60 * 1000, atLeast(0), Importance.LOW, METADATA_MAX_AGE_DOC)
                            .define(METRICS_SAMPLE_WINDOW_MS_CONFIG,
                                    Type.LONG,
                                    30000,
                                    atLeast(0),
                                    Importance.LOW,
                                    CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC)
                            .define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, 2, atLeast(1), Importance.LOW, CommonClientConfigs.METRICS_NUM_SAMPLES_DOC)
                            .define(METRICS_RECORDING_LEVEL_CONFIG,
                                    Type.STRING,
                                    Sensor.RecordingLevel.INFO.toString(),
                                    in(Sensor.RecordingLevel.INFO.toString(), Sensor.RecordingLevel.DEBUG.toString()),
                                    Importance.LOW,
                                    CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC)
                            .define(METRIC_REPORTER_CLASSES_CONFIG,
                                    Type.LIST,
                                    Collections.emptyList(),
                                    new ConfigDef.NonNullValidator(),
                                    Importance.LOW,
                                    CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
                            .define(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
                                    Type.INT,
                                    5,
                                    atLeast(1),
                                    Importance.LOW,
                                    MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC)
                            .define(KEY_SERIALIZER_CLASS_CONFIG,
                                    Type.CLASS,
                                    Importance.HIGH,
                                    KEY_SERIALIZER_CLASS_DOC)
                            .define(VALUE_SERIALIZER_CLASS_CONFIG,
                                    Type.CLASS,
                                    Importance.HIGH,
                                    VALUE_SERIALIZER_CLASS_DOC)
                            /* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */
                            .define(CONNECTIONS_MAX_IDLE_MS_CONFIG,
                                    Type.LONG,
                                    9 * 60 * 1000,
                                    Importance.MEDIUM,
                                    CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC)
                            .define(PARTITIONER_CLASS_CONFIG,
                                    Type.CLASS,
                                    DefaultPartitioner.class,
                                    Importance.MEDIUM, PARTITIONER_CLASS_DOC)
                            .define(INTERCEPTOR_CLASSES_CONFIG,
                                    Type.LIST,
                                    Collections.emptyList(),
                                    new ConfigDef.NonNullValidator(),
                                    Importance.LOW,
                                    INTERCEPTOR_CLASSES_DOC)
                            .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
                                    Type.STRING,
                                    CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
                                    Importance.MEDIUM,
                                    CommonClientConfigs.SECURITY_PROTOCOL_DOC)
                            .withClientSslSupport()
                            .withClientSaslSupport()
                            .define(ENABLE_IDEMPOTENCE_CONFIG,
                                    Type.BOOLEAN,
                                    false,
                                    Importance.LOW,
                                    ENABLE_IDEMPOTENCE_DOC)
                            .define(TRANSACTION_TIMEOUT_CONFIG,
                                    Type.INT,
                                    60000,
                                    Importance.LOW,
                                    TRANSACTION_TIMEOUT_DOC)
                            .define(TRANSACTIONAL_ID_CONFIG,
                                    Type.STRING,
                                    null,
                                    new ConfigDef.NonEmptyString(),
                                    Importance.LOW,
                                    TRANSACTIONAL_ID_DOC);
}

其中还有关于SSL和SASL的配置在.withClientSslSupport().withClientSaslSupport()中,涉及类SslConfigsSaslConfigs

Kafka消费者默认配置

参数名默认值参数说明
group.id" "标识此消费者所属的消费者组的唯一字符串。 如果消费者通过使用 subscribe(topic) 或基于 Kafka 的偏移管理策略使用组管理功能,则需要此属性。
session.timeout.ms10000使用 Kafka 的组管理工具时用于检测消费者故障的超时时间。 消费者定期发送心跳以向代理表明其活跃度。 如果在此会话超时到期之前代理没有收到心跳,则代理将从组中删除此消费者并启动重新平衡。 请注意,该值必须在 group.min.session.timeout.msgroup.max.session.timeout.ms 在代理配置中配置的允许范围内 .
heartbeat.interval.ms3000使用 Kafka 的组管理工具时,消费者协调器的心跳之间的预期时间。 心跳用于确保消费者的会话保持活跃,并在新消费者加入或离开组时促进重新平衡。 该值必须设置为低于 session.timeout.ms,但通常不应设置为高于该值的 1/3。 它可以调整得更低,以控制正常重新平衡的预期时间。
partition.assignment.strategyCollections.singletonList(RangeAssignor.class)消费者分配分区策略。
metadata.max.age.ms5 * 60 * 1000即使我们没有看到任何分区领导更改以主动发现任何新代理或分区,在这个时间内我们也会强制刷新元数据,单位ms。
enable.auto.committrue如果为true,消费者的偏移量将在后台定期提交。
auto.commit.interval.ms5000如果 enable.auto.commit 设置为 true,则消费者偏移量自动提交给 Kafka 的频率(以毫秒为单位)。
client.id" "客户端id
max.partition.fetch.bytes1 * 1024 * 1024服务器将返回的每个分区的最大数据量。 记录由消费者批量获取。 如果 fetch 的第一个非空分区中的第一个记录批次大于此限制,则仍会返回该批次以确保消费者可以取得进展。 代理接受的最大记录批量大小通过 message.max.bytes(代理配置)或 max.message.bytes(主题配置)定义。 请参阅FETCH_MAX_BYTES_CONFIG以限制消费者请求大小。
send.buffer.bytes128 * 1024发送数据时使用的 TCP 发送缓冲区 (SO_SNDBUF) 的大小。 如果值为 -1,则将使用操作系统默认值。单位B
receive.buffer.bytes64 * 1024读取数据时使用的 TCP 接收缓冲区 (SO_RCVBUF) 的大小。 如果值为 -1,则将使用操作系统默认值。单位B
fetch.min.bytes1服务器应为获取请求返回的最小数据量。 如果没有足够的数据可用,则请求将在回答请求之前等待积累足够多的数据。 1 字节的默认设置意味着只要有一个字节的数据可用或获取请求超时等待数据到达就会响应获取请求。 将此设置为大于 1 的值将导致服务器等待大量数据积累,这可以稍微提高服务器吞吐量,但会增加一些延迟。
fetch.max.bytes50 * 1024 * 1024服务器应为获取请求返回的最大数据量。 记录由消费者分批获取,如果获取的第一个非空分区中的第一个记录批次大于该值,则仍会返回该记录批次,以确保消费者可以取得进展。 因此,这不是绝对最大值。 代理接受的最大记录批量大小通过 message.max.bytes(代理配置)或 max.message.bytes(主题配置)定义。 请注意,消费者并行执行多次提取。
fetch.max.wait.ms500如果没有足够的数据来立即满足 fetch.min.bytes 给出的要求,则服务器在响应 fetch 请求之前将阻塞的最长时间。
reconnect.backoff.ms50L在尝试重新连接到给定主机之前等待的基本时间。 这避免了在紧密循环中重复连接到主机。 此退避适用于客户端到代理的所有连接尝试。单位ms
reconnect.backoff.max.ms1000L重新连接到反复连接失败的代理时要等待的最长时间(以毫秒为单位)。 如果提供,则每个主机的退避将在每次连续连接失败时呈指数增加,直至达到此最大值。 计算回退增量后,添加 20% 的随机抖动以避免连接风暴。单位ms
retry.backoff.ms100L在尝试重试对给定主题分区的失败请求之前等待的时间。 这避免了在某些故障情况下在紧密循环中重复发送请求。单位ms
auto.offset.reset“latest”“latest”, “earliest”, “none”。
当Kafka中没有初始偏移量或者服务器上不再存在当前偏移量时(例如因为该数据已被删除)该怎么办:
1.earliest:自动将偏移量重置为最早的偏移量
2.latest:自动将偏移量重置为最新的偏移量
3.none:如果没有找到消费者组的先前偏移量,则向消费者抛出异常
4.任何其他:向消费者抛出异常 消费者。
check.crcstrue自动检查消耗记录的CRC32。 这可确保不会对发生的消息进行在线或磁盘损坏。 此检查会增加一些开销,因此在寻求极端性能的情况下可能会禁用它。
metrics.sample.window.ms30000计算指标样本的时间窗口。
metrics.num.samples2为计算指标而维护的样本数。
metrics.log.levelSensor.RecordingLevel.INFO.toString()指标输入日志级别,“0”:INFO,”1“:DEBUG
metric.reportersCollections.emptyList()用作指标报告器的类列表。 实现 org.apache.kafka.common.metrics.MetricsReporter 接口允许插入将在新指标创建时通知的类。 JmxReporter 总是包含在注册 JMX 统计信息中。
request.timeout.ms305000chosen to be higher than the default of max.poll.interval.ms这应该大于replica.lag.time.max.ms(代理配置)以减少由于不必要的生产者重试而导致消息重复的可能性。
connections.max.idle.ms9 * 60 * 1000在此配置指定的毫秒数后关闭空闲连接。默认值9分钟,单位ms。
interceptor.classesCollections.emptyList()用作拦截器的类列表。 实现 org.apache.kafka.clients.producer.ProducerInterceptor 接口允许您在将生产者收到的记录发布到 Kafka 集群之前拦截(并可能改变)它们。 默认情况下,没有拦截器。
max.poll.records500单次调用 poll() 时返回的最大记录数。
max.poll.interval.ms300000使用消费者组管理时调用 poll() 之间的最大延迟。 这为消费者在获取更多记录之前可以空闲的时间设置了上限。 如果在此超时到期之前没有调用 poll(),则认为消费者失败,组将重新平衡以将分区重新分配给另一个成员。
exclude.internal.topicstrue是否应该将来自内部主题(如偏移量)的记录公开给消费者。如果设置为true,从内部主题接收记录的唯一方法是订阅它。
internal.leave.group.on.closetrue消费者是否应该离群。
如果设置为false,那么session.timeout.ms秒后才会重新平衡。
这是一种内部配置,将来可以以一种向后不兼容的方式进行更改
isolation.level“0”控制如何读取以事务方式编写的消息。
如果设置为read_committed, consumer.poll()将只返回已经提交的事务性消息。
如果设置为read_uncommitted默认值),consumer.poll()将返回所有消息,甚至是已经中止的事务性消息。
非事务性消息将在任一模式下无条件返回。
消息将始终按偏移顺序返回。因此,在 read_committed 模式下,consumer.poll() 只会返回直到最后一个稳定偏移量 (LSO) 的消息,该偏移量小于第一个打开事务的偏移量。特别是在属于正在进行的交易的消息之后出现的任何消息将被保留,直到相关的交易完成。因此,read_committed 消费者在有飞行交易时将无法读取到高水位线。
此外,当处于 read_committed 时seekToEnd 方法将返回 LSO。
security.protocolPLAINTEXT用于与代理通信的协议。 有效值为:Utils.join(SecurityProtocol.names(), ", ") 。

ConsumerConfig类中static{}块。

static {
    CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
                                    Type.LIST,
                                    Collections.emptyList(),
                                    new ConfigDef.NonNullValidator(),
                                    Importance.HIGH,
                                    CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
                            .define(GROUP_ID_CONFIG, Type.STRING, "", Importance.HIGH, GROUP_ID_DOC)
                            .define(SESSION_TIMEOUT_MS_CONFIG,
                                    Type.INT,
                                    10000,
                                    Importance.HIGH,
                                    SESSION_TIMEOUT_MS_DOC)
                            .define(HEARTBEAT_INTERVAL_MS_CONFIG,
                                    Type.INT,
                                    3000,
                                    Importance.HIGH,
                                    HEARTBEAT_INTERVAL_MS_DOC)
                            .define(PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
                                    Type.LIST,
                                    Collections.singletonList(RangeAssignor.class),
                                    new ConfigDef.NonNullValidator(),
                                    Importance.MEDIUM,
                                    PARTITION_ASSIGNMENT_STRATEGY_DOC)
                            .define(METADATA_MAX_AGE_CONFIG,
                                    Type.LONG,
                                    5 * 60 * 1000,
                                    atLeast(0),
                                    Importance.LOW,
                                    CommonClientConfigs.METADATA_MAX_AGE_DOC)
                            .define(ENABLE_AUTO_COMMIT_CONFIG,
                                    Type.BOOLEAN,
                                    true,
                                    Importance.MEDIUM,
                                    ENABLE_AUTO_COMMIT_DOC)
                            .define(AUTO_COMMIT_INTERVAL_MS_CONFIG,
                                    Type.INT,
                                    5000,
                                    atLeast(0),
                                    Importance.LOW,
                                    AUTO_COMMIT_INTERVAL_MS_DOC)
                            .define(CLIENT_ID_CONFIG,
                                    Type.STRING,
                                    "",
                                    Importance.LOW,
                                    CommonClientConfigs.CLIENT_ID_DOC)
                            .define(MAX_PARTITION_FETCH_BYTES_CONFIG,
                                    Type.INT,
                                    DEFAULT_MAX_PARTITION_FETCH_BYTES,
                                    atLeast(0),
                                    Importance.HIGH,
                                    MAX_PARTITION_FETCH_BYTES_DOC)
                            .define(SEND_BUFFER_CONFIG,
                                    Type.INT,
                                    128 * 1024,
                                    atLeast(-1),
                                    Importance.MEDIUM,
                                    CommonClientConfigs.SEND_BUFFER_DOC)
                            .define(RECEIVE_BUFFER_CONFIG,
                                    Type.INT,
                                    64 * 1024,
                                    atLeast(-1),
                                    Importance.MEDIUM,
                                    CommonClientConfigs.RECEIVE_BUFFER_DOC)
                            .define(FETCH_MIN_BYTES_CONFIG,
                                    Type.INT,
                                    1,
                                    atLeast(0),
                                    Importance.HIGH,
                                    FETCH_MIN_BYTES_DOC)
                            .define(FETCH_MAX_BYTES_CONFIG,
                                    Type.INT,
                                    DEFAULT_FETCH_MAX_BYTES,
                                    atLeast(0),
                                    Importance.MEDIUM,
                                    FETCH_MAX_BYTES_DOC)
                            .define(FETCH_MAX_WAIT_MS_CONFIG,
                                    Type.INT,
                                    500,
                                    atLeast(0),
                                    Importance.LOW,
                                    FETCH_MAX_WAIT_MS_DOC)
                            .define(RECONNECT_BACKOFF_MS_CONFIG,
                                    Type.LONG,
                                    50L,
          

以上是关于Kafka生产者默认配置消费者默认配置说明的主要内容,如果未能解决你的问题,请参考以下文章

Kafka 之 中级

kafka参数配置

kafka入门使用

kafka常用生产者消费者配置

Java调用Kafka生产者,消费者Api及相关配置说明

如何将数据从 Kafka 传递到 Spark Streaming?