SpringBoot整合Kafka
Posted 莒县程序猿
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBoot整合Kafka相关的知识,希望对你有一定的参考价值。
目录
前言
SpringBoot整合Kafka 监听拿取数据
提示:以下是本篇文章正文内容,下面案例可供参考
一、SpringBoot怎么整合Kafka?
这篇讲的是SpringBoot如何整合KAf'ka
二、使用步骤
1.YML文件的配置
代码如下(示例):
spring:
kafka:
bootstrap-servers: 25.219.254.89:19092,25.219.254.90:19092,25.219.254.91:19092,25.219.254.92:19092,25.219.254.93:19092,25.219.254.94:19092,25.219.254.95:19092
consumer:
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#value 反序列化
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#key 反序列化
enable-auto-commit: false
auto-commit-interval: 1000
properties:
sasl:
mechanism:
security:
protocol:
jaas:
config: org.apache.kafka.common.security.scram.ScramLoginModule required username= password=
listener:
concurrency: 5
properties:
sasl:
mechanism:
jaas:
config: org.apache.kafka.common.security.scram.ScramLoginModule required username= password=
security:
protocol:
sasl:密钥的连入 listener 监听器的配置
2.消费者的config
代码如下(示例):
@Configuration
@EnableKafka
public class KafkaListenerContainerFactory
@Value("$spring.kafka.bootstrap-servers")
private String bootstrapServers;
//把YML写的引入进来
@Value("$spring.kafka.xxx")
@Bean("containerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> containerFactory()
ConcurrentKafkaListenerContainerFactory<String, String> container = new ConcurrentKafkaListenerContainerFactory<>();
container.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps()));
// 设置并发量,小于或等于Topic的分区数
container.setConcurrency(1);
// 设置为批量监听
container.setBatchListener(true);
// 设置提交偏移量的方式
container.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return container;
private Map<String, Object> consumerProps()
Map<String, Object> props = new HashMap<>(8);
// kafka服务地址
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// 设置是否自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, );
// 一次拉取消息数量
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "");
// 序列化
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, );
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, );
props.put("sasl.mechanism",);
props.put("security.protocol",);
props.put("sasl.jaas.config",);
return props;
map里面把该配置的通过@value引入进来填入 或者手写填入都可以
3.如何取出kafka数据
@Component public class ConsumerDemo /** * 每次监听返回的数据 * 这里是批量取出 * @param record */ @KafkaListener(topics = "自己的topics",containerFactory = "containerFactory",groupId = "myTopic") public void listener(List<ConsumerRecord<?,?>> record ) List<String> messages = new ArrayList<>(); record.forEach(records -> Optional<?> kafkaMessage = Optional.ofNullable(records.value()); kafkaMessage.ifPresent(o -> messages.add(o.toString())); );
4.pom文件的配置
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.7.0</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.13.2</version> </dependency>
总结
这就是Springboot去整合卡夫卡 并且连入批量取出其中的数据
以上是关于SpringBoot整合Kafka的主要内容,如果未能解决你的问题,请参考以下文章