Kafka两种配置文件方式
Posted 一个抓手
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka两种配置文件方式相关的知识,希望对你有一定的参考价值。
1.yml配置文件(简单配置)
spring:
kafka:
bootstrap-servers: ip:端口
consumer:
group-id: group-test
enable-auto-commit: true
auto-commit-interval: 1000ms
auto-offset-reset: latest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
2.类配置方式(使用了SASL/PLAINTEXT安全认证协议)
出现[Consumer clientId=consumer-1, groupId=group1] Bootstrap broker xxx (id: -1 rack: null) disconnected报错信息就需要用这种配置方式,加入安全认证
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
/**
* @author 向振华
* @date 2021/05/10 15:58
*/
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(10);
factory.getContainerProperties().setPollTimeout(1500);
return factory;
}
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>(16);
// 服务地址
propsMap.put("bootstrap.servers", "ip:端口");
// 安全认证协议
propsMap.put("security.protocol", "SASL_PLAINTEXT");
propsMap.put("sasl.mechanism", "PLAIN");
// 填充安全认证用户名和密码
propsMap.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\\"usnm\\" password=\\"pwd\\";");
propsMap.put("group.id", "group-test");
propsMap.put("enable.auto.commit", true);
propsMap.put("auto.commit.interval.ms", 1000);
// latest: 从最新的偏移量开始消费
propsMap.put("auto.offset.reset", "latest");
// 反序列化方式
propsMap.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
propsMap.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
return propsMap;
}
}
再加一个配置文件sasl.jaas.config到resource
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="kafkaadmin"
password="kafkaadminpwd"
user_kafkaadmin="kafkaadminpwd"
user_kafkaclient1="kafkaclient1pwd"
user_kafkaclient2="kafkaclient2pwd";
};
以上是关于Kafka两种配置文件方式的主要内容,如果未能解决你的问题,请参考以下文章