spring boot kafka 通用 JSON 模板发送方

Posted

技术标签:

【中文标题】spring boot kafka 通用 JSON 模板发送方【英文标题】:spring boot kafka generic JSON templateSender 【发布时间】:2020-12-22 19:16:30 【问题描述】:
package com.bankia.apimanager.config;

import com.bankia.apimanager.model.RequestDTO;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConfiguration 

    @Value("$spring.kafka.bootstrap-servers")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> producerConfigs() 
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return props;
    

    @Bean
    public ProducerFactory<String, RequestDTO> producerFactory() 
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    

    @Bean
    public KafkaTemplate<String, RequestDTO> kafkaTemplate() 
        return new KafkaTemplate<>(producerFactory());
    


package com.bankia.apimanager.controller;

import com.bankia.apimanager.model.RequestDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/infrastructure")
public class InfraStructureRequestController 

    private final static Logger LOG = LoggerFactory.getLogger( InfraStructureRequestController.class );

    private static final String TOPIC = "test";

    @Autowired
    private KafkaTemplate<String, RequestDTO> sender;

    @RequestMapping(value = "/test", method = RequestMethod.GET)
    public String postMessage()

        ListenableFuture<SendResult<String, RequestDTO>> future = sender.send(TOPIC, new RequestDTO("Hola","Paco"));
        future.addCallback(new ListenableFutureCallback<SendResult<String, RequestDTO>>() 
            @Override
            public void onSuccess(SendResult<String, RequestDTO> result) 
                LOG.info("Sent message with offset=[" + result.getRecordMetadata().offset() + "]");
            
            @Override
            public void onFailure(Throwable ex) 
                LOG.error("Unable to send message due to : " + ex.getMessage());
            
        );
        return "OK";
    

但是如果我现在想发送一个新的 DTO 对象呢?我是否必须声明一个新的KafkaTemplate&lt;String,NEWOBJECT&gt; 并自动装配在配置中为每个对象声明的每个 kafka 模板?还有另一种方法可以只声明一个 kafkaTemplate ,我可以在其中发送任何类型的对象并自动在 JSON 中序列化?

【问题讨论】:

【参考方案1】:

我认为,您可以指定一个通用的KafkaTemplate&lt;String, Object&gt; 并将生产者值序列化器设置为JsonSerializer,如下所示:

@Configuration
public class KafkaConfiguration 

    @Value("$spring.kafka.bootstrap-servers")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> producerConfigs() 
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return props;
    

    @Bean
    public ProducerFactory<String, Object> producerFactory() 
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() 
        return new KafkaTemplate<>(producerFactory());
    


【讨论】:

您能否澄清一下到底是什么不起作用?谢谢!【参考方案2】:

引用您的代码:

Value Serializer 被正确定义为 JsonSerializer,它将任何类型的对象转换为 JSON。
@Bean
public Map<String, Object> producerConfigs() 
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); 
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    return props;

在 KafkaConfig & Controller 的每个地方将 更改为

请记住,泛型会一直保留到编译时(类型擦除) 仅限。

【讨论】:

【参考方案3】:

有两种情况:

场景 #1

如果您想使用KafkaTemplate 将任何类型(如您的问题中提到的)发送到kafka,则无需声明自己的KafkaTemplate bean,因为Spring boot 在KafkaAutoConfiguration 中为您完成了此操作。

package org.springframework.boot.autoconfigure.kafka;

...

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(KafkaProperties.class)
@Import( KafkaAnnotationDrivenConfiguration.class, KafkaStreamsAnnotationDrivenConfiguration.class )
public class KafkaAutoConfiguration 

    private final KafkaProperties properties;

    public KafkaAutoConfiguration(KafkaProperties properties) 
        this.properties = properties;
    

    @Bean
    @ConditionalOnMissingBean(KafkaTemplate.class)
    public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory,
            ProducerListener<Object, Object> kafkaProducerListener,
            ObjectProvider<RecordMessageConverter> messageConverter) 
        KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
        messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
        kafkaTemplate.setProducerListener(kafkaProducerListener);
        kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
        return kafkaTemplate;
    



**Some Note**:

    此配置类已用 @ConditionalOnClass(KafkaTemplate.class) 注释,这意味着:(来自 spring 文档--->)@Conditional 仅在指定类位于类路径上时匹配。

    kafkaTemplate bean 方法被注释为 @ConditionalOnMissingBean(KafkaTemplate.class) 表示:(来自 spring 文档 ---->)@Conditional 仅在 BeanFactory 中已包含满足指定要求的 bean 时才匹配。

    重要!在纯 Java 世界中,KafkaTemplate&lt;?, ?&gt; 不是例如 KafkaTemplate&lt;String, RequestDTO&gt; 的子类型,因此您不能这样做:

KafkaTemplate<?, ?> kf1 = ...;
KafkaTemplate<String, RequestDTO> kf2 = kf1; // Compile time error

因为 Java 参数化类型是不变的,如 Effective Java 第三版第 31 项中所述。但是 Spring World 是可以的,并且会被注入到您自己的服务中。你只需要在你的 kafkaTemplate 属性上指定你自己的泛型类型。 例如:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaService 
    @Autowired
    private KafkaTemplate<Integer, String> kafkaTemplate1;
    @Autowired
    private KafkaTemplate<Integer, RequestDTO> KafkaTemplate2;


场景 #2

如果您需要限制 kafka 记录的值类型,那么您需要指定自己的 kafka bean,如下所示:


@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(CorridorTracingConfiguration.class)
public class CorridorKafkaAutoConfiguration 
    
    @Bean
    @ConditionalOnMissingBean(KafkaTemplate.class)
    public KafkaTemplate<?, AbstractMessage> kafkaTemplate(ProducerFactory<Object, AbstractMessage> kafkaProducerFactory,
                                                              ProducerListener<Object, AbstractMessage> kafkaProducerListener,
                                                              ObjectProvider<RecordMessageConverter> messageConverter) 
        KafkaTemplate<Object, AbstractMessage> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
        messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
        kafkaTemplate.setProducerListener(kafkaProducerListener);
        kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
        return kafkaTemplate;
    

现在这只能注入到 KafkaTemplate&lt;String, AbstractMessage&gt; kafkaTemplate,密钥类型可以是其他任何类型,而不是 String。但是您可以通过它将AbstractMessage 的任何子类型发送到kafka。

示例用法:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaService 
    @Autowired
    private KafkaTemplate<String, AbstractMessage> kafkaTemplate;

    public void makeTrx(TrxRequest trxRequest) 
        kafkaTemplate.send("fraud-request", trxRequest.fromAccountNumber(), new FraudRequest(trxRequest));
    




@Accessors(chain = true)
@Getter
@Setter
@EqualsAndHashCode(callSuper = true)
@ToString(callSuper = true)
public class FraudRequest extends AbstractMessage 
    private float amount;
    private String fromAccountNumber;
    private String toAccountNumber;

...



要限制kafka消息的密钥,请遵循相同(以上)的方式

【讨论】:

以上是关于spring boot kafka 通用 JSON 模板发送方的主要内容,如果未能解决你的问题,请参考以下文章

spring boot引入kafka

docker快速安装kafka,zookeeper ,体验spring-boot-demo-mq-kafka

spring boot 配置使用kafka

解决 spring boot 访问 docker kafka 失败

Kafka学习--spring boot 整合kafka

spring boot怎么启动kafka