基于 Spring functional 的Secured Kafka (kerberos) configuration
Posted 勿忘勿助,看平地长得万丈高
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于 Spring functional 的Secured Kafka (kerberos) configuration相关的知识,希望对你有一定的参考价值。
基于kerberos 加密的kafka消息我们在用spring binder链接的时候配置应当如下:
#input spring.cloud.stream.bindings.process-in-0.destination=input-topic spring.cloud.stream.bindings.process-in-0.binder=kafka1 spring.cloud.stream.bindings.process-in-0.group=groupId1 spring.cloud.stream.bindings.process-in-0.consumer.startOffset=latest spring.cloud.stream.binders.kafka1.type=kafka spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.binder.brokers=host1:port,host2:port spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.binder.autoCreateTopics=false spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.binder.jaas.options.useKeyTab=true spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.binder.jaas.options.serviceName=kafka spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.binder.jaas.options.storeKey=true spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.binder.jaas.options.keyTab=/file/input.keytab spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.binder.jaas.options.principal=user@domain.com spring.cloud.stream.binders.kafka1.environment.spring.kafka.consumer.value-deserializer=com.kafka.message.serializer.EntityDeserialize #output spring.cloud.stream.bindings.process-out-0.destination=output-topic spring.cloud.stream.bindings.process-out-0.binder=kafka2 spring.cloud.stream.bindings.process-out-0.group=groupId2 spring.cloud.stream.binders.kafka2.type=kafka spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.binder.brokers=host1:port,host2:port spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.binder.autoCreateTopics=false spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.binder.jaas.options.useKeyTab=true spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.binder.jaas.options.serviceName=kafka spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.binder.jaas.options.storeKey=true spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.binder.jaas.options.keyTab=/file/output.keytab spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.binder.jaas.options.principal=user@domain.com
同时要设置java变量
System.setProperty("java.security.krb5.conf", "/file/krb5.conf"); System.setProperty("sun.security.krb5.debug", "true");
另外,要序列化kafka message的内容除了
默认提供的基本类型序列化器,我们也可以自定义例如上面配置的
com.kafka.message.serializer.EntityDeserialize
public class EntityDeserialize implements Deserializer { private ObjectReader objectReader = new ObjectMapper().readerFor(EntityTest.class); @Override public EntityTest deserialize(String arg0, byte[] arg1) { try { return objectReader.readValue(arg1); } catch (Exception e) { throw new RuntimeException("message deserialize error", e); } } } //对应的functional方法 @Bean public Function<Flux<Message<EntityTest>>, Flux<Message<String>>> process(MessageProcessService messageProcessService) { return messageFlux ->MessageProcessService.process(messageFlux); }
以上是关于基于 Spring functional 的Secured Kafka (kerberos) configuration的主要内容,如果未能解决你的问题,请参考以下文章
Spring Security - Thymeleaf - 我可以在 sec:authorize 标签中评估 SPEL 表达式吗?
spring security3 动态从数据库中读取权限信息<sec:authorize>标签 url属性不起作用
基于 Spring functional 的Secured Kafka (kerberos) configuration