基于 Spring functional 的Secured Kafka (kerberos) configuration

Posted 勿忘勿助,看平地长得万丈高

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于 Spring functional 的Secured Kafka (kerberos) configuration相关的知识,希望对你有一定的参考价值。

基于kerberos 加密的kafka消息我们在用spring binder链接的时候配置应当如下:

#input
spring.cloud.stream.bindings.process-in-0.destination=input-topic
spring.cloud.stream.bindings.process-in-0.binder=kafka1
spring.cloud.stream.bindings.process-in-0.group=groupId1
spring.cloud.stream.bindings.process-in-0.consumer.startOffset=latest
spring.cloud.stream.binders.kafka1.type=kafka
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.binder.brokers=host1:port,host2:port
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.binder.autoCreateTopics=false
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.binder.jaas.options.useKeyTab=true
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.binder.jaas.options.serviceName=kafka
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.binder.jaas.options.storeKey=true
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.binder.jaas.options.keyTab=/file/input.keytab
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.binder.jaas.options.principal=user@domain.com
spring.cloud.stream.binders.kafka1.environment.spring.kafka.consumer.value-deserializer=com.kafka.message.serializer.EntityDeserialize
#output
spring.cloud.stream.bindings.process-out-0.destination=output-topic
spring.cloud.stream.bindings.process-out-0.binder=kafka2
spring.cloud.stream.bindings.process-out-0.group=groupId2
spring.cloud.stream.binders.kafka2.type=kafka
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.binder.brokers=host1:port,host2:port
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.binder.autoCreateTopics=false
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.binder.jaas.options.useKeyTab=true
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.binder.jaas.options.serviceName=kafka
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.binder.jaas.options.storeKey=true
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.binder.jaas.options.keyTab=/file/output.keytab
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.binder.jaas.options.principal=user@domain.com

  

同时要设置java变量

System.setProperty("java.security.krb5.conf", "/file/krb5.conf");
System.setProperty("sun.security.krb5.debug", "true");

 

另外,要序列化kafka message的内容除了

默认提供的基本类型序列化器,我们也可以自定义例如上面配置的

com.kafka.message.serializer.EntityDeserialize
public class EntityDeserialize implements Deserializer {
    private ObjectReader objectReader = new ObjectMapper().readerFor(EntityTest.class);

    @Override
    public EntityTest deserialize(String arg0, byte[] arg1) {
        try {
            return objectReader.readValue(arg1);
        } catch (Exception e) {
            throw new RuntimeException("message deserialize error", e);
        }
    }
}


//对应的functional方法
 @Bean
    public Function<Flux<Message<EntityTest>>, Flux<Message<String>>> process(MessageProcessService messageProcessService) {
        return messageFlux ->MessageProcessService.process(messageFlux);

    }        

  

以上是关于基于 Spring functional 的Secured Kafka (kerberos) configuration的主要内容,如果未能解决你的问题,请参考以下文章

Spring Security - Thymeleaf - 我可以在 sec:authorize 标签中评估 SPEL 表达式吗?

spring security3 动态从数据库中读取权限信息<sec:authorize>标签 url属性不起作用

基于 Spring functional 的Secured Kafka (kerberos) configuration

jpetStore 学习总结

如何在 Spring Boot 中实现基于角色权限的系统

Spring Security应用开发(20)基于方法的授权使用@RolesAllowed注解