Spring Boot Rest api 与 Spring Kafka
Posted
技术标签:
【中文标题】Spring Boot Rest api 与 Spring Kafka【英文标题】:Spring boot Rest api with Spring Kafka 【发布时间】:2020-11-13 10:28:29 【问题描述】:我设计了一个 Spring boot REST API ADD 和 GET 方法
@RestController("ProductV1Controller")
public class ProductController
private final IProductProducer _productProducer;
public ProductController(IProductProducer productProducer)
_productProducer = productProducer;
@PostMapping()
void AddProduct(@Valid @RequestBody ProductViewModel product)
_productProducer.AddProduct(product);
@GetMapping()
List<ProductViewModel> Products()
var test = _productProducer.GetProducts();
return _productProducer.GetProducts();
服务层
@Service
public class ProductProducer implements IProductProducer
private final KafkaTemplate<String, Object> _template;
public ProductProducer(KafkaTemplate<String, Object> _template)
this._template = _template;
@Override
public List<ProductViewModel> GetProducts()
this._template.send(ProductTopicConstants.GET_PRODUCTS,null);
return List.of(new ProductViewModel("","",0,"")); --> Need to return the value from the kafka
@Override
public void AddProduct(ProductViewModel product)
this._template.send(ProductTopicConstants.ADD_PRODUCT, product);
卡夫卡监听器
@KafkaListener(id = ProductTopicConstants.GET_PRODUCTS, topics = ProductTopicConstants.GET_PRODUCTS)
public List<Product> GetProducts()
return _productRepository.findAll();
在服务层GetProducts()
我需要返回来自_productRepository.findAll();
的项目列表
使用 Spring kafka 执行 REST API 的最佳方法是什么。
【问题讨论】:
【参考方案1】:您需要使用ReplyingKafkaTemplate
将结果返回给其余控制器。
见ReplyingKafkaTemplate。
2.1.3 版引入了 KafkaTemplate 的子类来提供请求/回复语义。该类名为 ReplyingKafkaTemplate 并且有一个方法(除了超类中的方法)。
结果是一个用结果异步填充的 ListenableFuture(或异常,超时)。结果还有一个 sendFuture 属性,它是调用 KafkaTemplate.send() 的结果。您可以使用这个未来来确定发送操作的结果。
文档有一个例子。
编辑
@SpringBootApplication
@RestController
public class So63058608Application
private static final Logger LOG = LoggerFactory.getLogger(So63058608Application.class);
public static void main(String[] args)
SpringApplication.run(So63058608Application.class, args);
@Autowired
private ReplyingKafkaTemplate<String, String, List<String>> replyTemplate;
@GetMapping(path = "/get")
public List<String> getThem() throws Exception
RequestReplyFuture<String, String, List<String>> future =
this.replyTemplate.sendAndReceive(new ProducerRecord<>("so63058608-1", 0, null, null));
LOG.info(future.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
return future.get(10, TimeUnit.SECONDS).value();
@KafkaListener(id = "so63058608-1", topics = "so63058608-1", splitIterables = false)
@SendTo
public List<String> returnList(@Payload(required = false) String payload)
return new ArrayList<>(List.of("foo", "bar", "baz"));
@Bean
public ReplyingKafkaTemplate<String, String, List<String>> replyer(ProducerFactory<String, String> pf,
ConcurrentKafkaListenerContainerFactory<String, List<String>> containerFactory)
containerFactory.setReplyTemplate(kafkaTemplate(pf));
ConcurrentMessageListenerContainer<String, List<String>> container = replyContainer(containerFactory);
ReplyingKafkaTemplate<String, String, List<String>> replyer = new ReplyingKafkaTemplate<>(pf, container);
return replyer;
@Bean
public ConcurrentMessageListenerContainer<String, List<String>> replyContainer(
ConcurrentKafkaListenerContainerFactory<String, List<String>> containerFactory)
ConcurrentMessageListenerContainer<String, List<String>> container =
containerFactory.createContainer("so63058608-2");
container.getContainerProperties().setGroupId("so63058608-2");
container.setBatchErrorHandler(new BatchLoggingErrorHandler());
return container;
@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf)
return new KafkaTemplate<>(pf);
@Bean
public NewTopic topic1()
return TopicBuilder.name("so63058608-1").partitions(1).replicas(1).build();
@Bean
public NewTopic topic3()
return TopicBuilder.name("so63058608-2").partitions(1).replicas(1).build();
spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
$ curl localhost:8080/get
["foo","bar","baz"]
EDIT2
并返回一些对象的列表...
@SpringBootApplication
@RestController
public class So63058608Application
private static final Logger LOG = LoggerFactory.getLogger(So63058608Application.class);
public static void main(String[] args)
SpringApplication.run(So63058608Application.class, args);
@Autowired
private ReplyingKafkaTemplate<String, String, List<Foo>> replyTemplate;
@GetMapping(path = "/get")
public List<Foo> getThem() throws Exception
RequestReplyFuture<String, String, List<Foo>> future =
this.replyTemplate.sendAndReceive(new ProducerRecord<>("so63058608-1", 0, null, null));
LOG.info(future.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
List<Foo> result = future.get(10, TimeUnit.SECONDS).value();
LOG.info(result.toString());
return result;
@KafkaListener(id = "so63058608-1", topics = "so63058608-1", splitIterables = false)
@SendTo
public List<Foo> returnList(@Payload(required = false) String payload)
return new ArrayList<>(List.of(new Foo("foo"), new Foo("bar"), new Foo("baz")));
@Bean
public ReplyingKafkaTemplate<String, String, List<Foo>> replyer(ProducerFactory<String, String> pf,
ConcurrentKafkaListenerContainerFactory<String, List<Foo>> containerFactory)
containerFactory.setReplyTemplate(kafkaTemplate(pf));
ConcurrentMessageListenerContainer<String, List<Foo>> container = replyContainer(containerFactory);
ReplyingKafkaTemplate<String, String, List<Foo>> replyer = new ReplyingKafkaTemplate<>(pf, container);
return replyer;
@Bean
public ConcurrentMessageListenerContainer<String, List<Foo>> replyContainer(
ConcurrentKafkaListenerContainerFactory<String, List<Foo>> containerFactory)
ConcurrentMessageListenerContainer<String, List<Foo>> container =
containerFactory.createContainer("so63058608-2");
container.getContainerProperties().setGroupId("so63058608-2");
container.setBatchErrorHandler(new BatchLoggingErrorHandler());
return container;
@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf)
return new KafkaTemplate<>(pf);
@Bean
public NewTopic topic1()
return TopicBuilder.name("so63058608-1").partitions(1).replicas(1).build();
@Bean
public NewTopic topic3()
return TopicBuilder.name("so63058608-2").partitions(1).replicas(1).build();
public static JavaType returnType(byte[] data, Headers headers)
return TypeFactory.defaultInstance()
.constructCollectionLikeType(List.class, Foo.class);
class Foo
private String bar;
public Foo()
public Foo(String bar)
this.bar = bar;
public String getBar()
return this.bar;
public void setBar(String bar)
this.bar = bar;
@Override
public String toString()
return "Foo [bar=" + this.bar + "]";
spring.kafka.consumer.properties.spring.json.value.type.method=com.example.demo.So63058608Application.returnType
[Foo [bar=foo], Foo [bar=bar], Foo [bar=baz]]
【讨论】:
是否有任何带有 Rest API 的示例应用程序? 不,但我在答案中添加了一个示例。 如何传递字符串或对象。当我尝试 RequestReplyFuture以上是关于Spring Boot Rest api 与 Spring Kafka的主要内容,如果未能解决你的问题,请参考以下文章
Spring Boot Rest API 返回与 Lombok 一起使用的空 JSON
7. Spring Boot2.5 安全机制与 REST API 身份验证实战
如果 xml 元素命名约定与 POJO 属性命名约定不同,则发送到 Spring Boot REST API 的 XML 元素不会映射到 POJO
Spring Boot 2 Rest Api Example
如果xml元素命名约定与POJO属性命名约定不同,则发送到Spring Boot REST API的XML元素不会映射到POJO
集成 Angular + Nodejs + Spring Boot Java REST API 使 Angular Universal 工作