如何通过 Spring Boot 的 rest 调用在数据准备好时传输?
Posted
技术标签:
【中文标题】如何通过 Spring Boot 的 rest 调用在数据准备好时传输?【英文标题】:How to transmit when data is ready through a rest call with Spring Boot? 【发布时间】:2021-12-06 12:34:04 【问题描述】:我有一个 ssh 管理器来在服务器上执行 (bash) 脚本。它包含一个commandWithContinousRead(String command, Consumer<String> consumer)
。每当在 bash 脚本中调用 echo 时,它都会被消费者使用。我想用 Spring Boot 和 HTTP 调用来扩展它。当客户端发送请求时,服务器会在数据准备就绪时从 bash 脚本流式传输数据,客户端可以将其打印出来。
我知道服务器发送事件,但是,我觉得这主要用于事件,并且通常在 API 上使用多个资源。
此外,我尝试搜索流媒体主题,但没有成功。我确实从 Spring 中找到了StreamingResponseBody
,但它会收集所有数据,然后一次性发送。
我使用 Postman 进行测试,也许它无法处理流媒体? 但是,我该如何测试呢?
例子:
#/bin/bash
# Scriptname: stream-this.sh
echo "Starting line"
sleep 4
echo "Middle line"
sleep 4
echo "End line"
使用commandWithContinousRead
请求,但在八秒后立即打印所有内容。
@RequestMapping(value = "/stream-this", method = RequestMethod.POST,
produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public ???? streamScript()
StreamingResponseBody stream = out ->
sshManager.commandWithContinousRead("bash /scripts/stream-this.sh", echo ->
try
byte[] bytes = echo.getBytes(StandardCharsets.UTF_8);
out.write(bytes);
System.out.println(echo);
catch (IOException e)
e.printStackTrace();
);
;
return new ResponseEntity<>(stream, HttpStatus.OK);
commandWithContinousRead
函数的实现。
public void commandWithContinousRead(String command, Consumer<String> consumer)
SSHClient client = buildClient();
try (Session session = client.startSession())
Session.Command cmd = session.exec(command);
BufferedReader br = new BufferedReader(new InputStreamReader(cmd.getInputStream(), StandardCharsets.UTF_8));
String line;
while ((line = br.readLine()) != null)
consumer.accept(line);
br.close();
catch (IOException e)
e.printStackTrace();
finally
try
client.disconnect();
catch (IOException e)
e.printStackTrace();
【问题讨论】:
【参考方案1】:现在您已经发布了commandWithContinuousRead
方法,一切看起来都是正确的。另外,您刚刚声明您正在使用 Postman 进行测试,这绝对是个问题——postman 不支持流式响应
https://github.com/postmanlabs/postman-app-support/issues/5040
以编程方式对代码进行单元和集成测试总是一个好主意。一个简单的单元测试甚至不需要使用 Spring 或真正的 SSH 连接(在测试本地运行 bash 脚本)。单元测试只是测试消费者的逻辑,并让您知道输出的读取和 bash 脚本本身没有阻塞。理想情况下,您会使用 junit,但这是我放在一起的一个简单的测试类,它说明了我的意思。
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.IOException;
import java.lang.Process;
import java.nio.charset.StandardCharsets;
import java.util.function.Consumer;
public class Test
// This would be a @Test instead of a main
public static void main(String... args)
commandWithContinousRead("bash stream-this.sh", echo ->
byte[] bytes = echo.getBytes(StandardCharsets.UTF_8);
// assert statements go here
System.out.println("In main -- " + echo);
);
public static void commandWithContinousRead(String command, Consumer<String> consumer)
try
Process process = Runtime.getRuntime().exec(command);
BufferedReader br = new BufferedReader(new InputStreamReader(process.getInputStream()));
String line;
while ((line = br.readLine()) != null)
consumer.accept(line);
br.close();
catch (IOException e)
e.printStackTrace();
集成测试实际上会设置 Spring,并通过端点,从而以与客户端/浏览器相同的方式进行测试。通常,这是使用 @WebMvcTest
和 mockMvc
异步完成的。您可以选择模拟 SSH 客户端,或者明确设置服务器,以便您的实际 SSH 客户端可以连接到它。 (第二个选项将暴露/消除与 ssh 连接相关的问题)。这种测试会暴露弹簧设置/流响应的问题。您需要在 5 秒后在模拟 mvc 上设置人工超时,并在 9 秒后使用新的模拟 mvc 这将让您看到 5 秒后收到第一个回声,9 秒后,你有整个预期的反应。一个很好的起点是查看https://www.tabnine.com/code/java/methods/org.springframework.test.web.servlet.result.RequestResultMatchers/asyncStarted
通过这两个级别的测试后,您就会开始怀疑客户,在这种情况下,就是 Postman。如果可能,请尝试使用将运行您的代码的实际浏览器或客户端。事实证明,流式传输可能不适合您。
请贴出commandWithContinousRead
的实现
这可能是一个基本问题,即正在回显和休眠的脚本与应该读取回显并打印出来的代码在同一线程上运行。即,您在等待 bash 脚本本身运行时处于阻塞状态,这将解释在获得任何输出之前的 8 秒延迟。另外,commandWithContinousRead
返回什么类型?根据您在该方法中“读取”回声的方式,您也可能会在那里阻塞。如果没有看到 commandWithContinousRead
的代码,很难 100% 肯定地说。
您的返回类型将是ResponseEntity<StreamingResponseBody>
(填写????)
【讨论】:
添加了实现。 感谢您解释如何测试实现。我找到了使用 SSE 的方法。【参考方案2】:好的,我想出了一个可行的解决方案。正如 Pickled Brain 所提到的,主要问题是 Postman 不使用流媒体。此外,我在一次调用中返回尝试 SSE,并通过在另一个线程中运行 bash 脚本来实现。此外,我在 Nodejs 中创建了一个 SSE 客户端用于测试目的,它运行良好。
运行脚本并将其放置在另一个线程中的函数。
private SseEmitter runScript()
SseEmitter emitter = new SseEmitter(-1L); // -1L = no timeout
ExecutorService sseMvcExecutor = Executors.newSingleThreadExecutor();
sseMvcExecutor.execute(() ->
try
shellManager.commandWithContinousRead("bash scriptname"), s ->
SseEmitter.SseEventBuilder event = SseEmitter.event().name("message").data(s);
try
emitter.send(event);
System.out.println(s);
catch (IOException e)
e.printStackTrace();
);
emitter.send(SseEmitter.event().name("close").data(""));
catch (IOException e)
e.printStackTrace();
emitter.complete();
);
return emitter;
上交所客户:
const EventSource = require('eventsource'); // npm install eventsource
const url = 'yoururl';
var es = new EventSource(url);
es.onopen = function(ev)
console.log("OPEN");
console.log(ev);
;
es.onmessage = function(ev)
console.log("MESSAGE");
console.log(ev.data);
;
es.addEventListener('close', function()
es.close();
console.log('closing!');
);
es.onerror = function(ev)
console.log("ERROR");
console.log(ev);
es.close();
;
process.on('SIGINT', () =>
es.close();
console.log(es.CLOSED);
);
【讨论】:
以上是关于如何通过 Spring Boot 的 rest 调用在数据准备好时传输?的主要内容,如果未能解决你的问题,请参考以下文章
Spring Boot提供RESTful接口时的错误处理实践
如何使用Azure AD B2C保护Spring Boot REST API?
使用 Spring Boot 和 Neo4j 通过 REST API 插入实体时出错
Elasticsearch:通过 Spring Boot 创建 REST APIs 来访问 Elasticsearch