利用Nacos服务获取配置逻辑的特点,实现动态配置kafak认证
Posted 程序员TD的生活
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了利用Nacos服务获取配置逻辑的特点,实现动态配置kafak认证相关的知识,希望对你有一定的参考价值。
我要做什么?
实现Nacos动态配置kafka认证信息,使每个微服务读取同一个kafka配置,并生成文件注入到环境变量中。
为什么要这么做?
首先我们看下
Kafka-java接入demo,如图:
1.prod_client_jaas.conf文件
KafkaClient{
org.apache.kafka.common.security.plain.PlainLoginModule required
username="kafka_1"
password="密码";
};
2.cons_client_jaas.conf
KafkaClient{
org.apache.kafka.common.security.plain.PlainLoginModule required
username="kafka_1"
password="密码";
};
3.producer
package com.sensetime.kafka;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
/**
* Producer
*
*/
public class App
{
public static void main( String[] args )
{
String fsPath=System.getProperty("user.dir");
System.out.println(fsPath);
System.setProperty("java.security.auth.login.config", ""+fsPath+"/conf/prod_client_jaas.conf"); // 环境变量添加,需要输入配置文件的路径
System.out.println("===================配置文件地址"+fsPath+"/conf/prod_client_jaas.conf");
Properties props = new Properties();
props.put("bootstrap.servers", "ip:9092,ip:9092,ip:9092"); //此处为kafka接入点
props.put("acks", "1");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.setProperty ("security.protocol", "SASL_PLAINTEXT");
props.setProperty ("sasl.mechanism", "PLAIN");
Producer producer = null;
try {
producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
String msg = "Message " + i;
producer.send(new ProducerRecord("testtime", msg)); //此处为创建的topic
System.out.println("Sent:" + msg);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
}
}
}
4.consumer
package com.sensetime.kafka;
import java.util.Arrays;import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class Consumer {
public static void main(String[] args) {
String fsPath=System.getProperty("user.dir");
System.setProperty("java.security.auth.login.config", ""+fsPath+"/conf/cons_client_jaas.conf"); // 环境变量添加,需要输入配置文件的路径
System.out.println("===================配置文件地址"+fsPath+"/conf/cons_client_jaas.conf");
Properties props = new Properties();
props.put("bootstrap.servers", "ip:9092,ip:9092,ip:9092"); //kafka接入点
props.put("group.id", "group1"); //创建的group
props.put("group.name", "1");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "earliest");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty ("security.protocol", "SASL_PLAINTEXT");
props.setProperty ("sasl.mechanism", "PLAIN");
KafkaConsumer kafkaConsumer = new Kafkansumer<>(props);
kafkaConsumer.subscribe(Arrays.asList("testtime")); //此处为订阅的topic
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println("线程1"+":"+"Partition: " + record.partition() + " Offset: " + record.offset() + " Value: " + record.value() + " ThreadID: " + Thread.currentThread().getId());
}
}
}
}
这个配置认证方式的痛点在于1: 需要在springboot刚启动还未进行kafka建立连接之前,将认证信息注入到环境变量里边
2:需要每个微服务都要配置认证信息的文件。考虑到我们使用Nacos作为配置中心,我的想法是利用Nacos进行配置认证信息,并在springboot启动后kafka实例化前,读取认证信息,设置到环境变量里。
我是怎么做的?
首先看图所示,springboot启动时候会在refreshContext(context)之前执行初始化applyInitializers,当spring执行这个类PropertySourceBootstrapConfiguration的时候,会执行Nacos的相关获取配置解析配置的方法,所以,我只也搞一个同样的initalizer,并且改initalizer排序在Nacos执行之后,不就解决了吗,对的,就是这样的
实现代码如下:
注意:如果我们自己定义启动执行前的类需如下防范
public class MyApiApplication {
public static void main(String[] args) {
SpringApplication application = new SpringApplication(MyApiApplication.class);
application.addInitializers(new KafkaSaslConfiguration());
application.run(args);
}
}
public class KafkaSaslConfiguration implements
ApplicationContextInitializer<ConfigurableApplicationContext>, Ordered {
private static Logger log = LoggerFactory.getLogger(KafkaSaslConfiguration.class);
private static final String STORE_CONS_CLIENT_JAAS_PATH = "./conf/cons_client_jaas.conf";
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE + 11;
}
@Override
public void initialize(ConfigurableApplicationContext applicationContext) {
ConfigurableEnvironment environment = applicationContext.getEnvironment();
String username = environment.getProperty("kafka-sasl-username");
String password = environment.getProperty("kafka-sasl-password");
if (StringUtils.isBlank(username) || StringUtils.isBlank(password)) {
log.error("kafka sasl need kafka-sasl-username and kafka-sasl-password,please set value for Nacos.");
System.exit(-1);
}
String kafkaClient = "KafkaClient{\n" +
" org.apache.kafka.common.security.plain.PlainLoginModule required\n" +
" username=\"" + username + "\"\n" +
" password=\"" + password + "\";\n" +
" };";
String clientPath = System.getProperty("user.dir") + "/conf/cons_client_jaas.conf";
System.setProperty("java.security.auth.login.config", clientPath);
ResourceUtils.readStringToDisk(kafkaClient, STORE_CONS_CLIENT_JAAS_PATH);
log.info("java.security.auth.login.config: {}", clientPath);
}
}
以上是关于利用Nacos服务获取配置逻辑的特点,实现动态配置kafak认证的主要内容,如果未能解决你的问题,请参考以下文章