SpringBoot整合Kafka

Posted 秃头谷雨

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBoot整合Kafka相关的知识,希望对你有一定的参考价值。

目录

前言

一、SpringBoot怎么整合Kafka?

二、使用步骤

1.YML文件的配置

2.消费者的config

3.如何取出kafka数据

4.pom文件的配置

总结



前言

SpringBoot整合Kafka 监听拿取数据


提示:以下是本篇文章正文内容,下面案例可供参考

一、SpringBoot怎么整合Kafka?

这篇讲的是SpringBoot如何整合KAf'ka

二、使用步骤

1.YML文件的配置

代码如下(示例):

spring:
  kafka:

    bootstrap-servers: 25.219.254.89:19092,25.219.254.90:19092,25.219.254.91:19092,25.219.254.92:19092,25.219.254.93:19092,25.219.254.94:19092,25.219.254.95:19092
    consumer:
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    #value 反序列化
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer  
    #key 反序列化
      enable-auto-commit: false
      auto-commit-interval: 1000
      properties:
        sasl:
          mechanism: 
        security:
          protocol: 
         jaas:
          config: org.apache.kafka.common.security.scram.ScramLoginModule required username= password=
    listener:
      concurrency: 5
    properties:
      sasl:
        mechanism: 
        jaas:
          config: org.apache.kafka.common.security.scram.ScramLoginModule required username= password=
      security:
        protocol: 

sasl:密钥的连入 listener 监听器的配置

2.消费者的config

代码如下(示例):

@Configuration
@EnableKafka
public class KafkaListenerContainerFactory {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
//把YML写的引入进来
@Value("${spring.kafka.xxx}")

    @Bean("containerFactory")
    public ConcurrentKafkaListenerContainerFactory<String, String> containerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> container = new ConcurrentKafkaListenerContainerFactory<>();
        container.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps()));
        // 设置并发量,小于或等于Topic的分区数
        container.setConcurrency(1);
        // 设置为批量监听
        container.setBatchListener(true);
        // 设置提交偏移量的方式
        container.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return container;
    }
    private Map<String, Object> consumerProps() {
        Map<String, Object> props = new HashMap<>(8);
        // kafka服务地址
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        // 设置是否自动提交
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, );
        // 一次拉取消息数量
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "");
        // 序列化
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, );
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, );

        props.put("sasl.mechanism",);
        props.put("security.protocol",);
        props.put("sasl.jaas.config",);

        return props;
    }

}

map里面把该配置的通过@value引入进来填入 或者手写填入都可以

3.如何取出kafka数据

@Component
public class ConsumerDemo {


    /**
     * 每次监听返回的数据
     * 这里是批量取出
     * @param record
     */
    @KafkaListener(topics = "自己的topics",containerFactory = "containerFactory",groupId = "myTopic")
    public void  listener(List<ConsumerRecord<?,?>> record
                          ){
        List<String> messages = new ArrayList<>();
        record.forEach(records ->{
            Optional<?> kafkaMessage = Optional.ofNullable(records.value());
            kafkaMessage.ifPresent(o -> messages.add(o.toString()));
        });

         }

}

4.pom文件的配置

<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.7.0</version>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
</dependency>

<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.13.2</version>
</dependency>

总结

这就是Springboot去整合卡夫卡 并且连入批量取出其中的数据

以上是关于SpringBoot整合Kafka的主要内容,如果未能解决你的问题,请参考以下文章

springboot整合kafka,kafka消息过滤

SpringBoot Kafka 整合实例教程

Windows平台整合SpringBoot+KAFKA__第2部分_代码编写前传

SpringBoot整合Kafka

SpringBoot整合Kafka

SpringBoot整合Kafka