我可以使用 Spring WebFlux 实现通过 Kafka 请求/响应主题获取数据的 REST 服务吗?

Posted

技术标签:

【中文标题】我可以使用 Spring WebFlux 实现通过 Kafka 请求/响应主题获取数据的 REST 服务吗?【英文标题】:Can I use Spring WebFlux to implement REST services which get data through Kafka request/response topics? 【发布时间】:2018-07-25 16:00:02 【问题描述】:

我正在开发 REST 服务,该服务反过来会查询缓慢的遗留系统,因此响应时间将以秒为单位。我们还预计会有大量负载,因此我正在考虑使用异步/非阻塞方法来避免数百个“servlet”线程在调用慢速系统时阻塞。

正如我所见,这可以使用新的 servlet API 规范中的 AsyncContext 来实现。我什至开发了小型原型,它似乎正在工作。

另一方面,我似乎可以使用 Spring WebFlux 实现相同的目标。 不幸的是,我没有找到任何使用 Mono/Flux 包装自定义“后端”调用的示例。大多数示例只是重用了已经准备好的响应式连接器,例如 ReactiveCassandraOperations.java 等。

我的数据流如下:

JS 客户端 --> Spring RestController --> 向 Kafka 主题发送请求 --> 从 Kafka 回复主题中读取响应 --> 向客户端返回数据

我可以将 Kafka 步骤包装到 Mono/Flux 中吗?如何做到这一点? 我的 RestController 方法应该是什么样子?

这是我使用 Servlet 3.1 API 实现相同效果的简单实现

//took the idea from some Jetty examples
public class AsyncRestServlet extends HttpServlet 
...
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException 

    String result = (String) req.getAttribute(RESULTS_ATTR);

    if (result == null)  //data not ready yet: schedule async processing
        final AsyncContext async = req.startAsync();

        //generate some unique request ID
        String uid = "req-" + String.valueOf(req.hashCode()); 

        //share it to Kafka receive together with AsyncContext
        //when Kafka receiver will get the response it will put it in Servlet request attribute and call async.dispatch()
        //This doGet() method will be called again and it will send the response to client
        receiver.rememberKey(uid, async); 

        //send request to Kafka
        sender.send(uid, param); 

        //data is not ready yet so we are releasing Servlet thread
        return;
    

    //return result as html response
    resp.setContentType("text/html");
    PrintWriter out = resp.getWriter();
    out.println(result);
    out.close();

【问题讨论】:

【参考方案1】:

这是一个简短的示例 - 不是您可能想到的 WebFlux 客户端,但至少它可以让您利用 Flux 和 Mono 进行异步处理,我将其解释为您的问题的重点。 Web 对象无需额外配置即可工作,但您当然需要配置 Kafka,因为 KafkaTemplate 对象无法单独工作。

    @Bean // Using org.springframework.web.reactive.function.server.RouterFunction<ServerResponse>
    public RouterFunction<ServerResponse> sendMessageToTopic(KafkaController kafkaController)
        return RouterFunctions.route(RequestPredicates.POST("/endpoint"), kafkaController::sendMessage);
    

    @Component
    public class ResponseHandler 
        public getServerResponse() 
            return ServerResponse.ok().body(Mono.just(Status.SUCCESS), String.class);
        
    

    @Component 
    public class KafkaController 
        public Mono<ServerResponse> auditInvalidTransaction(ServerRequest request) 
            return request.bodyToMono(TopicMsgMap.class) 
                // your HTTP call may not return immediately without this
                .subscribeOn(Schedulers.single()) // for a single worker thread
                .flatMap(topicMsgMap -> 
                    MyKafkaPublisher.sendMessages(topicMsgMap);
                .flatMap(responseHandler::getServerResponse);
        
    

    @Data // model class just to easily convert the ServerRequest (from json, for ex.)
    // + ~@constructors
    public class TopicMsgMap() 
        private Map<String, String> topicMsgMap;
    

    @Service // Using org.springframework.kafka.core.KafkaTemplate<String, String>
    public class MyKafkaPublisher 
        @Autowired
        private KafkaTemplate<String, String> template;

        @Value("$topic1")
        private String topic1;
        @Value("$topic2")
        private String topic2;

        public void sendMessages(Map<String, String> topicMsgMap)
            topicMsgMap.forEach((top, msg) -> 
                if (topic.equals("topic1") kafkaTemplate.send(topic1, message);
                if (topic.equals("topic2") kafkaTemplate.send(topic2, message);
            );
        
    

猜测这不是您想到的用例,但希望您发现这个通用结构有用。

【讨论】:

以上是关于我可以使用 Spring WebFlux 实现通过 Kafka 请求/响应主题获取数据的 REST 服务吗?的主要内容,如果未能解决你的问题,请参考以下文章

从另一个 API 端点动态触发现有 Flux - Spring Webflux

Spring Webflux WebFilterChain中的URL模式过滤

我可以在 Spring WebFlux 上实现 WebSecurityConfigurerAdapter

无法通过 repo 删除文档-> 使用 Webflux 在 Spring Boot 2.x 中删除

深入剖析 Spring WebFlux

Spring WebFlux:只允许一个连接接收订阅者