spring boot kafka 通用 JSON 模板发送方
Posted
技术标签:
【中文标题】spring boot kafka 通用 JSON 模板发送方【英文标题】:spring boot kafka generic JSON templateSender 【发布时间】:2020-12-22 19:16:30 【问题描述】:package com.bankia.apimanager.config;
import com.bankia.apimanager.model.RequestDTO;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConfiguration
@Value("$spring.kafka.bootstrap-servers")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs()
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
@Bean
public ProducerFactory<String, RequestDTO> producerFactory()
return new DefaultKafkaProducerFactory<>(producerConfigs());
@Bean
public KafkaTemplate<String, RequestDTO> kafkaTemplate()
return new KafkaTemplate<>(producerFactory());
package com.bankia.apimanager.controller;
import com.bankia.apimanager.model.RequestDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/infrastructure")
public class InfraStructureRequestController
private final static Logger LOG = LoggerFactory.getLogger( InfraStructureRequestController.class );
private static final String TOPIC = "test";
@Autowired
private KafkaTemplate<String, RequestDTO> sender;
@RequestMapping(value = "/test", method = RequestMethod.GET)
public String postMessage()
ListenableFuture<SendResult<String, RequestDTO>> future = sender.send(TOPIC, new RequestDTO("Hola","Paco"));
future.addCallback(new ListenableFutureCallback<SendResult<String, RequestDTO>>()
@Override
public void onSuccess(SendResult<String, RequestDTO> result)
LOG.info("Sent message with offset=[" + result.getRecordMetadata().offset() + "]");
@Override
public void onFailure(Throwable ex)
LOG.error("Unable to send message due to : " + ex.getMessage());
);
return "OK";
但是如果我现在想发送一个新的 DTO 对象呢?我是否必须声明一个新的KafkaTemplate<String,NEWOBJECT>
并自动装配在配置中为每个对象声明的每个 kafka 模板?还有另一种方法可以只声明一个 kafkaTemplate ,我可以在其中发送任何类型的对象并自动在 JSON 中序列化?
【问题讨论】:
【参考方案1】:我认为,您可以指定一个通用的KafkaTemplate<String, Object>
并将生产者值序列化器设置为JsonSerializer
,如下所示:
@Configuration
public class KafkaConfiguration
@Value("$spring.kafka.bootstrap-servers")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs()
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
@Bean
public ProducerFactory<String, Object> producerFactory()
return new DefaultKafkaProducerFactory<>(producerConfigs());
@Bean
public KafkaTemplate<String, Object> kafkaTemplate()
return new KafkaTemplate<>(producerFactory());
【讨论】:
您能否澄清一下到底是什么不起作用?谢谢!【参考方案2】:引用您的代码:
Value Serializer 被正确定义为 JsonSerializer,它将任何类型的对象转换为 JSON。在 KafkaConfig & Controller 的每个地方将@Bean public Map<String, Object> producerConfigs() Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return props;
请记住,泛型会一直保留到编译时(类型擦除) 仅限。
【讨论】:
【参考方案3】:有两种情况:
场景 #1
如果您想使用KafkaTemplate
将任何类型(如您的问题中提到的)发送到kafka,则无需声明自己的KafkaTemplate
bean,因为Spring boot 在KafkaAutoConfiguration
中为您完成了此操作。
package org.springframework.boot.autoconfigure.kafka;
...
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(KafkaProperties.class)
@Import( KafkaAnnotationDrivenConfiguration.class, KafkaStreamsAnnotationDrivenConfiguration.class )
public class KafkaAutoConfiguration
private final KafkaProperties properties;
public KafkaAutoConfiguration(KafkaProperties properties)
this.properties = properties;
@Bean
@ConditionalOnMissingBean(KafkaTemplate.class)
public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory,
ProducerListener<Object, Object> kafkaProducerListener,
ObjectProvider<RecordMessageConverter> messageConverter)
KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
kafkaTemplate.setProducerListener(kafkaProducerListener);
kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
return kafkaTemplate;
**Some Note**
:
此配置类已用 @ConditionalOnClass(KafkaTemplate.class)
注释,这意味着:(来自 spring 文档--->)@Conditional 仅在指定类位于类路径上时匹配。
kafkaTemplate bean 方法被注释为
@ConditionalOnMissingBean(KafkaTemplate.class)
表示:(来自 spring 文档 ---->)@Conditional 仅在 BeanFactory 中已包含满足指定要求的 bean 时才匹配。
重要!在纯 Java 世界中,KafkaTemplate<?, ?>
不是例如 KafkaTemplate<String, RequestDTO>
的子类型,因此您不能这样做:
KafkaTemplate<?, ?> kf1 = ...;
KafkaTemplate<String, RequestDTO> kf2 = kf1; // Compile time error
因为 Java 参数化类型是不变的,如 Effective Java 第三版第 31 项中所述。但是 Spring World 是可以的,并且会被注入到您自己的服务中。你只需要在你的 kafkaTemplate 属性上指定你自己的泛型类型。 例如:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaService
@Autowired
private KafkaTemplate<Integer, String> kafkaTemplate1;
@Autowired
private KafkaTemplate<Integer, RequestDTO> KafkaTemplate2;
场景 #2
如果您需要限制 kafka 记录的值类型,那么您需要指定自己的 kafka bean,如下所示:
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(CorridorTracingConfiguration.class)
public class CorridorKafkaAutoConfiguration
@Bean
@ConditionalOnMissingBean(KafkaTemplate.class)
public KafkaTemplate<?, AbstractMessage> kafkaTemplate(ProducerFactory<Object, AbstractMessage> kafkaProducerFactory,
ProducerListener<Object, AbstractMessage> kafkaProducerListener,
ObjectProvider<RecordMessageConverter> messageConverter)
KafkaTemplate<Object, AbstractMessage> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
kafkaTemplate.setProducerListener(kafkaProducerListener);
kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
return kafkaTemplate;
现在这只能注入到
KafkaTemplate<String, AbstractMessage> kafkaTemplate
,密钥类型可以是其他任何类型,而不是 String
。但是您可以通过它将AbstractMessage
的任何子类型发送到kafka。
示例用法:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaService
@Autowired
private KafkaTemplate<String, AbstractMessage> kafkaTemplate;
public void makeTrx(TrxRequest trxRequest)
kafkaTemplate.send("fraud-request", trxRequest.fromAccountNumber(), new FraudRequest(trxRequest));
@Accessors(chain = true)
@Getter
@Setter
@EqualsAndHashCode(callSuper = true)
@ToString(callSuper = true)
public class FraudRequest extends AbstractMessage
private float amount;
private String fromAccountNumber;
private String toAccountNumber;
...
要限制kafka消息的密钥,请遵循相同(以上)的方式
【讨论】:
以上是关于spring boot kafka 通用 JSON 模板发送方的主要内容,如果未能解决你的问题,请参考以下文章
docker快速安装kafka,zookeeper ,体验spring-boot-demo-mq-kafka