Spring Boot Rest api 与 Spring Kafka

Posted

技术标签:

【中文标题】Spring Boot Rest api 与 Spring Kafka【英文标题】:Spring boot Rest api with Spring Kafka 【发布时间】:2020-11-13 10:28:29 【问题描述】:

我设计了一个 Spring boot REST API ADD 和 GET 方法

    @RestController("ProductV1Controller")
    public class ProductController 
     

         private final IProductProducer _productProducer;
         public ProductController(IProductProducer productProducer) 
        _productProducer = productProducer;

         @PostMapping()
            void AddProduct(@Valid @RequestBody ProductViewModel product) 
                _productProducer.AddProduct(product);
            
        
        @GetMapping()
            List<ProductViewModel> Products() 
                var test = _productProducer.GetProducts();
                return _productProducer.GetProducts();
            

服务层

@Service

    public class ProductProducer implements IProductProducer
        private final KafkaTemplate<String, Object> _template;
    
        public ProductProducer(KafkaTemplate<String, Object> _template) 
            this._template = _template;
        
    
        @Override
        public List<ProductViewModel> GetProducts() 
            this._template.send(ProductTopicConstants.GET_PRODUCTS,null);
            return List.of(new ProductViewModel("","",0,"")); --> Need to return the value from the kafka
        
    
        @Override
        public void AddProduct(ProductViewModel product) 
            this._template.send(ProductTopicConstants.ADD_PRODUCT, product);
        
       
    

卡夫卡监听器

 @KafkaListener(id = ProductTopicConstants.GET_PRODUCTS, topics = ProductTopicConstants.GET_PRODUCTS)
    public List<Product> GetProducts() 
        return _productRepository.findAll();
    

在服务层GetProducts()我需要返回来自_productRepository.findAll();的项目列表

使用 Spring kafka 执行 REST API 的最佳方法是什么。

【问题讨论】:

【参考方案1】:

您需要使用ReplyingKafkaTemplate 将结果返回给其余控制器。

见ReplyingKafkaTemplate。

2.1.3 版引入了 KafkaTemplate 的子类来提供请求/回复语义。该类名为 ReplyingKafkaTemplate 并且有一个方法(除了超类中的方法)。

结果是一个用结果异步填充的 ListenableFuture(或异常,超时)。结果还有一个 sendFuture 属性,它是调用 KafkaTemplate.send() 的结果。您可以使用这个未来来确定发送操作的结果。

文档有一个例子。

编辑

@SpringBootApplication
@RestController
public class So63058608Application 

    private static final Logger LOG = LoggerFactory.getLogger(So63058608Application.class);

    public static void main(String[] args) 
        SpringApplication.run(So63058608Application.class, args);
    

    @Autowired
    private ReplyingKafkaTemplate<String, String, List<String>> replyTemplate;

    @GetMapping(path = "/get")
    public List<String> getThem() throws Exception 
        RequestReplyFuture<String, String, List<String>> future =
                this.replyTemplate.sendAndReceive(new ProducerRecord<>("so63058608-1", 0, null, null));
        LOG.info(future.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
        return future.get(10, TimeUnit.SECONDS).value();
    

    @KafkaListener(id = "so63058608-1", topics = "so63058608-1", splitIterables = false)
    @SendTo
    public List<String> returnList(@Payload(required = false) String payload) 
        return new ArrayList<>(List.of("foo", "bar", "baz"));
    

    @Bean
    public ReplyingKafkaTemplate<String, String, List<String>> replyer(ProducerFactory<String, String> pf,
            ConcurrentKafkaListenerContainerFactory<String, List<String>> containerFactory) 

        containerFactory.setReplyTemplate(kafkaTemplate(pf));
        ConcurrentMessageListenerContainer<String, List<String>> container = replyContainer(containerFactory);
        ReplyingKafkaTemplate<String, String, List<String>> replyer = new ReplyingKafkaTemplate<>(pf, container);
        return replyer;
    

    @Bean
    public ConcurrentMessageListenerContainer<String, List<String>> replyContainer(
            ConcurrentKafkaListenerContainerFactory<String, List<String>> containerFactory) 

        ConcurrentMessageListenerContainer<String, List<String>> container =
                containerFactory.createContainer("so63058608-2");
        container.getContainerProperties().setGroupId("so63058608-2");
        container.setBatchErrorHandler(new BatchLoggingErrorHandler());
        return container;
    

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf) 
        return new KafkaTemplate<>(pf);
    

    @Bean
    public NewTopic topic1() 
        return TopicBuilder.name("so63058608-1").partitions(1).replicas(1).build();
    

    @Bean
    public NewTopic topic3() 
        return TopicBuilder.name("so63058608-2").partitions(1).replicas(1).build();
    


spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.properties.spring.json.trusted.packages=*

spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
$ curl localhost:8080/get
["foo","bar","baz"]

EDIT2

并返回一些对象的列表...

@SpringBootApplication
@RestController
public class So63058608Application 

    private static final Logger LOG = LoggerFactory.getLogger(So63058608Application.class);

    public static void main(String[] args) 
        SpringApplication.run(So63058608Application.class, args);
    

    @Autowired
    private ReplyingKafkaTemplate<String, String, List<Foo>> replyTemplate;

    @GetMapping(path = "/get")
    public List<Foo> getThem() throws Exception 
        RequestReplyFuture<String, String, List<Foo>> future =
                this.replyTemplate.sendAndReceive(new ProducerRecord<>("so63058608-1", 0, null, null));
        LOG.info(future.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
        List<Foo> result = future.get(10, TimeUnit.SECONDS).value();
        LOG.info(result.toString());
        return result;
    

    @KafkaListener(id = "so63058608-1", topics = "so63058608-1", splitIterables = false)
    @SendTo
    public List<Foo> returnList(@Payload(required = false) String payload) 
        return new ArrayList<>(List.of(new Foo("foo"), new Foo("bar"), new Foo("baz")));
    

    @Bean
    public ReplyingKafkaTemplate<String, String, List<Foo>> replyer(ProducerFactory<String, String> pf,
            ConcurrentKafkaListenerContainerFactory<String, List<Foo>> containerFactory) 

        containerFactory.setReplyTemplate(kafkaTemplate(pf));
        ConcurrentMessageListenerContainer<String, List<Foo>> container = replyContainer(containerFactory);
        ReplyingKafkaTemplate<String, String, List<Foo>> replyer = new ReplyingKafkaTemplate<>(pf, container);
        return replyer;
    

    @Bean
    public ConcurrentMessageListenerContainer<String, List<Foo>> replyContainer(
            ConcurrentKafkaListenerContainerFactory<String, List<Foo>> containerFactory) 

        ConcurrentMessageListenerContainer<String, List<Foo>> container =
                containerFactory.createContainer("so63058608-2");
        container.getContainerProperties().setGroupId("so63058608-2");
        container.setBatchErrorHandler(new BatchLoggingErrorHandler());
        return container;
    

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf) 
        return new KafkaTemplate<>(pf);
    

    @Bean
    public NewTopic topic1() 
        return TopicBuilder.name("so63058608-1").partitions(1).replicas(1).build();
    

    @Bean
    public NewTopic topic3() 
        return TopicBuilder.name("so63058608-2").partitions(1).replicas(1).build();
    

    public static JavaType returnType(byte[] data, Headers headers) 
        return TypeFactory.defaultInstance()
                .constructCollectionLikeType(List.class, Foo.class);
    



class Foo 

    private String bar;

    public Foo() 
    

    public Foo(String bar) 
        this.bar = bar;
    

    public String getBar() 
        return this.bar;
    

    public void setBar(String bar) 
        this.bar = bar;
    

    @Override
    public String toString() 
        return "Foo [bar=" + this.bar + "]";
    


spring.kafka.consumer.properties.spring.json.value.type.method=com.example.demo.So63058608Application.returnType
[Foo [bar=foo], Foo [bar=bar], Foo [bar=baz]]

【讨论】:

是否有任何带有 Rest API 的示例应用程序? 不,但我在答案中添加了一个示例。 如何传递字符串或对象。当我尝试 RequestReplyFuture future = this._replyTemplate.sendAndReceive(new ProducerRecord(ProductTopicConstants.GET_PRODUCT, 0, null,"uu7777uuuuuuu"));我收到一个异常无法从 JSON 转换; 不要将代码放入 cmets;它渲染得不好;改为编辑问题并包含完整的堆栈跟踪。并评论你已经这样做了,所以我会收到通知。 我添加了一个新问题***.com/questions/63099836/…

以上是关于Spring Boot Rest api 与 Spring Kafka的主要内容,如果未能解决你的问题,请参考以下文章

Spring Boot Rest API 返回与 Lombok 一起使用的空 JSON

7. Spring Boot2.5 安全机制与 REST API 身份验证实战

如果 xml 元素命名约定与 POJO 属性命名约定不同,则发送到 Spring Boot REST API 的 XML 元素不会映射到 POJO

Spring Boot 2 Rest Api Example

如果xml元素命名约定与POJO属性命名约定不同,则发送到Spring Boot REST API的XML元素不会映射到POJO

集成 Angular + Nodejs + Spring Boot Java REST API 使 Angular Universal 工作