我可以使用 Spring WebFlux 实现通过 Kafka 请求/响应主题获取数据的 REST 服务吗?
Posted
技术标签:
【中文标题】我可以使用 Spring WebFlux 实现通过 Kafka 请求/响应主题获取数据的 REST 服务吗?【英文标题】:Can I use Spring WebFlux to implement REST services which get data through Kafka request/response topics? 【发布时间】:2018-07-25 16:00:02 【问题描述】:我正在开发 REST 服务,该服务反过来会查询缓慢的遗留系统,因此响应时间将以秒为单位。我们还预计会有大量负载,因此我正在考虑使用异步/非阻塞方法来避免数百个“servlet”线程在调用慢速系统时阻塞。
正如我所见,这可以使用新的 servlet API 规范中的 AsyncContext 来实现。我什至开发了小型原型,它似乎正在工作。
另一方面,我似乎可以使用 Spring WebFlux 实现相同的目标。 不幸的是,我没有找到任何使用 Mono/Flux 包装自定义“后端”调用的示例。大多数示例只是重用了已经准备好的响应式连接器,例如 ReactiveCassandraOperations.java 等。
我的数据流如下:
JS 客户端 --> Spring RestController --> 向 Kafka 主题发送请求 --> 从 Kafka 回复主题中读取响应 --> 向客户端返回数据
我可以将 Kafka 步骤包装到 Mono/Flux 中吗?如何做到这一点? 我的 RestController 方法应该是什么样子?
这是我使用 Servlet 3.1 API 实现相同效果的简单实现
//took the idea from some Jetty examples
public class AsyncRestServlet extends HttpServlet
...
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException
String result = (String) req.getAttribute(RESULTS_ATTR);
if (result == null) //data not ready yet: schedule async processing
final AsyncContext async = req.startAsync();
//generate some unique request ID
String uid = "req-" + String.valueOf(req.hashCode());
//share it to Kafka receive together with AsyncContext
//when Kafka receiver will get the response it will put it in Servlet request attribute and call async.dispatch()
//This doGet() method will be called again and it will send the response to client
receiver.rememberKey(uid, async);
//send request to Kafka
sender.send(uid, param);
//data is not ready yet so we are releasing Servlet thread
return;
//return result as html response
resp.setContentType("text/html");
PrintWriter out = resp.getWriter();
out.println(result);
out.close();
【问题讨论】:
【参考方案1】:这是一个简短的示例 - 不是您可能想到的 WebFlux 客户端,但至少它可以让您利用 Flux 和 Mono 进行异步处理,我将其解释为您的问题的重点。 Web 对象无需额外配置即可工作,但您当然需要配置 Kafka,因为 KafkaTemplate 对象无法单独工作。
@Bean // Using org.springframework.web.reactive.function.server.RouterFunction<ServerResponse>
public RouterFunction<ServerResponse> sendMessageToTopic(KafkaController kafkaController)
return RouterFunctions.route(RequestPredicates.POST("/endpoint"), kafkaController::sendMessage);
@Component
public class ResponseHandler
public getServerResponse()
return ServerResponse.ok().body(Mono.just(Status.SUCCESS), String.class);
@Component
public class KafkaController
public Mono<ServerResponse> auditInvalidTransaction(ServerRequest request)
return request.bodyToMono(TopicMsgMap.class)
// your HTTP call may not return immediately without this
.subscribeOn(Schedulers.single()) // for a single worker thread
.flatMap(topicMsgMap ->
MyKafkaPublisher.sendMessages(topicMsgMap);
.flatMap(responseHandler::getServerResponse);
@Data // model class just to easily convert the ServerRequest (from json, for ex.)
// + ~@constructors
public class TopicMsgMap()
private Map<String, String> topicMsgMap;
@Service // Using org.springframework.kafka.core.KafkaTemplate<String, String>
public class MyKafkaPublisher
@Autowired
private KafkaTemplate<String, String> template;
@Value("$topic1")
private String topic1;
@Value("$topic2")
private String topic2;
public void sendMessages(Map<String, String> topicMsgMap)
topicMsgMap.forEach((top, msg) ->
if (topic.equals("topic1") kafkaTemplate.send(topic1, message);
if (topic.equals("topic2") kafkaTemplate.send(topic2, message);
);
猜测这不是您想到的用例,但希望您发现这个通用结构有用。
【讨论】:
以上是关于我可以使用 Spring WebFlux 实现通过 Kafka 请求/响应主题获取数据的 REST 服务吗?的主要内容,如果未能解决你的问题,请参考以下文章
从另一个 API 端点动态触发现有 Flux - Spring Webflux
Spring Webflux WebFilterChain中的URL模式过滤
我可以在 Spring WebFlux 上实现 WebSecurityConfigurerAdapter