kafka2.5.0生产者与消费者配置详解

Posted zhuwenjoyce

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka2.5.0生产者与消费者配置详解相关的知识,希望对你有一定的参考价值。

1)引入maven依赖

我这里使用的是springboot 2.1.3.RELEASE 版本:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

会引入一对的kafka包:

技术图片

 

 2)生产者配置:

所有配置参考kafka-clients-2.5.0.jar包里的org.apache.kafka.clients.producer.ProducerConfig类,并且在该类中可以查看所有配置项的默认值: CONFIG = (new ConfigDef()).define(  这里的define方法的第三个参数就是默认值

application.properties里可以这样配置:

#####################  重要配置  ######################
spring.kafka.producer.bootstrap.servers=192.168.2.60:9092,192.168.2.62:9092
spring.kafka.producer.key.serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value.serializer=org.apache.kafka.common.serialization.StringSerializer
# acks=0  如果设置为0,生产者将不等待任何来自服务器的确认。每个记录返回的偏移量将始终设置为-1。
# acks=1  这意味着leader确认消息即可,但不等待所有副本的完全确认的情况下进行响应。在这种情况下,如果leader在确认记录后立即失败,但是在副本复制它之前,那么记录将丢失。
# acks=all  不仅需要leader确认收到消息,还将等待全部的副本确认。这保证了只要至少有一个副本保持活动状态,记录就不会丢失。这是最有力的保证。这相当于ack =-1设置。
# acks=-1   跟集群有关
# 默认 1
spring.kafka.producer.acks=1
# 一个批次发送的大小,默认16KB,超过这个大小就会发送数据
spring.kafka.producer.batch.size=16384
# 一个批次最长等待多久就发送数据,默认0,即马上发送
spring.kafka.producer.linger.ms=5000
# 控制生产者最大发送大小,默认 1MB。这个值必须小于kafka服务器server.properties配置文件里的最大可接收数据大小配置:socket.request.max.bytes=104857600 (默认104857600 = 100MB)
spring.kafka.producer.max.request.size=1048576
 
#####################  非重要配置  ######################
# 生产者内存缓冲区大小。默认33554432bytes=32MB
spring.kafka.producer.buffer.memory=33554432
# 发送重试次数,默认 2147483647,接近无限大
spring.kafka.producer.retries=3
# 请求超时时间,默认30秒
spring.kafka.producer.request.timeout.ms=30000
# 默认值5。并发状态下,kafka生产者允许存在最大的kafka服务端未确认接收的消息个数最大值。
# 注意,如果该值设置为1,并且开启重试机制,则会在允许的重试次数内,阻塞其他消息发送到kafka Server端。并且为1的话,会严重影响生产者的吞吐量。仅适用于对数据有严格顺序要求的场景。
spring.kafka.producer.max.in.flight.requests.per.connection=5
# 最大阻塞时间,超过则抛出异常。默认60秒
spring.kafka.max.block.ms=60000
# 数据压缩类型:none、gzip、snappy、lz4、zstd,默认none什么都不做
spring.kafka.compression.type=none

在springboot框架里,手动封装 @bean对象:

@Configuration
@EnableKafka
public class KafkaProducerConfig {
    @Value("${kafka.producer.servers}")
    private String servers;
    @Value("${kafka.producer.retries}")
    private int retries;
    @Value("${kafka.producer.batch.size}")
    private int batchSize;
    @Value("${kafka.producer.linger}")
    private int linger;
    @Value("${kafka.producer.buffer.memory}")
    private int bufferMemory;

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.RETRIES_CONFIG, retries);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(props);
        KafkaTemplate kafkaTemplate
                = new KafkaTemplate<String, String>(factory) ;
        //kafkaTemplate.setProducerListener();
        return kafkaTemplate;
    }
}

  

 

 

3)消费者配置:

 

 

 

end.

以上是关于kafka2.5.0生产者与消费者配置详解的主要内容,如果未能解决你的问题,请参考以下文章

kafka2.5.0自定义数据序列化类

kafka2.5.0详解核心配置文件server.properties

kafka2.5.0分区再均衡监听器java例子(待续)

kafka2.5.0基本命令

SpringCloud系列十一:SpringCloudStream(SpringCloudStream 简介创建消息生产者创建消息消费者自定义消息通道分组与持久化设置 RoutingKey)(代码片段

kafka2.5.0硬件集群架构图Topic主题与Partitions分区架构图