将标准输出字节从 Process 转发到 WebSocket 或读取当前 InputStream 长度
Posted
技术标签:
【中文标题】将标准输出字节从 Process 转发到 WebSocket 或读取当前 InputStream 长度【英文标题】:Forward stdout bytes from Process to WebSocket or read current InputStream length 【发布时间】:2022-01-08 23:27:34 【问题描述】:我正在使用Jaffree 库通过 FFmpeg 转换和显示我的 RTSP 流。
我正在做与 node-rtsp-stream 库完全相同的事情,但使用 Java 而不是 Node.js
-
使用正确的参数和标准输出运行 ffmpeg。
捕获 stdOut 发出的数据包/字节。
使用 WebSocket 将它们发送给客户端。
我在客户端使用JSMpeg 来显示流 - 情况与 node-rtsp-stream 中的情况完全相同。
一切正常,但内存优化和消耗存在问题。
Jaffree 库中处理该过程的一段代码:
protected Executor startExecution(final Process process,
final AtomicReference<T> resultReference)
Executor executor = new Executor(contextName);
if (stdOutReader != null)
executor.execute("StdOut", new Runnable()
@Override
public void run()
T result = stdOutReader.read(process.getInputStream());
if (result != null)
boolean set = resultReference.compareAndSet(null, result);
if (!set)
LOGGER.warn("Ignored result of reading STD OUT: ", result);
);
我的 StdOutReader 实现:
private static final int BUFFER_SIZE = 2048000;
@Override
public T read(final InputStream stream)
Try.run(() ->
final byte[] buffer = new byte[BUFFER_SIZE];
int length;
while ((length = stream.read(buffer)) >= 0)
final byte[] finalBuffer = new byte[length];
System.arraycopy(buffer, 0, finalBuffer, 0, length);
socket.sendMessage(finalBuffer);
).onFailure(throwable ->
throw new RuntimeException("FFmpeg error", throwable);
);
return null;
在底层有一个简单的Process
,它返回InputStream
(实际上是BufferedInputStream
,这非常重要)。
我的问题是我不知道流中有多少字节,JSMpeg
需要特定的数字(帧/数据包)才能正确显示流而不会受到任何干扰。
所以我人为地创建了巨大的字节数组来处理最大的结果,但是我的记忆力受到了影响。
我只想使用 WebSocket 转发每个数据包。所以它应该是这样工作的(现在也是这样工作的):
StdOut sends packet -> passing packet to WebSocket -> StdOut sends packet -> passing packet to WebSocket -> ...
另一种选择是以某种方式预测InputStream
此时持有的字节数,但这对于BufferedInputStream
可能是不可能的。
-
我尝试使用
Reader
读取字符串行并将它们转换为字节,但它们返回的大小错误 - 这不起作用。
我尝试在 ffmpeg 上设置一些缓冲区/长度大小的数据包 - 这不起作用或流中断。
我注意到这些数据包是 188 字节的乘积,但即使将数据包转换为相似大小也会导致流中断(它们可能需要直接从 stdOut 获取大小)。
我试过readAllBytes()
,但循环结束了。
我用transferTo
和ByteArrayOutputStream
尝试了一些方法 - 没有任何帮助。
我已尝试检查 available()
的大小 InputStream
,但这也不起作用。
我尝试使用 Spring 5 WebFlux DataBuffer
将 Stream
转换为 Flux
并订阅,但我只能读取单个字节(或者我可能不知道如何读取整个当前缓冲区)。
也许有可能修改 JSMpeg 以接收静态大小的数据包(例如,我的意思总是 2048 字节) - 如果是这样,请告诉我。
或者我可以以某种方式从发送到 stdOut 的特定字节预测数据包的长度 - 如果是这样,请告诉我。
我怎样才能做到这一点,甚至有可能吗?
** 第一版 **
根据 Denis Kokorin 的回答,我已经使用 PipeOutput
实现了它,但它具有完全相同的限制。在后台PipeOutput
只是将流内容复制到new byte[]
,并使用预定义的缓冲区大小进行初始化。所以我得到这样的输出:
Array length: 262144, actual length: 9588
Array length: 262144, actual length: 4700
Array length: 262144, actual length: 9024
Array length: 262144, actual length: 10152
Array length: 262144, actual length: 18048
我需要发送具有实际长度的数组(使用 WebSocket)而不是数组长度。原因是JSMpeg 库的工作方式:
视频和音频的内部缓冲区非常小(512kb 和 分别为 128kb)和 JSMpeg 将丢弃旧的(甚至未播放的)数据 为新到达的数据腾出空间而没有太多的模糊性。这可以 当网络拥塞时引入解码伪影,但是 确保将延迟保持在最低限度。
JSMpeg 的默认缓冲区大小为 524 288
,这样就不会显示任何内容。
我很确定ChannelOutput
的情况会非常相似。如果没有,请告诉我如何使用 WebSocket 从SeekableByteChannel
传递数据。
您提到了 Spring,可能您也将它用于 WebSockets。 Spring 在 WebSockets 上使用 STOMP 协议。 WebSockets 几乎就像 TCP 套接字:它们允许您发送任意字节。 STOMP 类似 到 HTTP。因此可以通过 WebSocket 流式传输字节,但没有 跺脚。
是的,我没有直接提到它。我确实在使用带有 spring-boot-starter-websocket
包的 Spring 5。在我的特定情况下,我需要使用 STOMP,但我使用普通 Java 函数发送这些字节 - 我将 byte[]
作为参数 (SimpMessagingTemplate.convertAndSend
) 传递。
我还想提出一个替代解决方案:如果有什么 不适合内存 - 将其保存在磁盘上。
这可能不是我的选择。这是直播,不是标准视频,我必须尽可能降低延迟。
文件上传后立即编码 MJPEG。
它不是 MJPEG 格式 - 它是 mpegts
。 JSMpeg 是库的名称。 :)
ffmpeg 命令如下所示:
ffmpeg -loglevel level+info -i rtsp://test:test@10.10.10.10:554/stream -n -rtsp_transport tcp -f mpegts -codec:v mpeg1video -r 30 -s 480x320 -b:v 1500k -bf 0 -an -vf scale=480:320 -
【问题讨论】:
检查 PipeOutput.pumpTo(final OutputStream 目标,final int bufferSize)。您可以配置缓冲区大小。 @DenisKokorin 问题是bufferSize
是一个静态值。我不知道数据包长度是 9588、18048、108288 还是 4700。我想以某种方式预测/提取特定数据包的长度。例如,如果我将 bufferSize
初始化为 20000,则其余字节将作为零发送,这也会中断流式传输。
OutputStream 有方法 writeBytes(byte[] bytes, int offset, int length)。您可以使用 Arrays.copy() 填充大小长度的字节数组。
没错,但请注意,这正是我在上述代码中StdOutReader
实现(read
方法)中所做的。我需要创建一个巨大的缓冲区大小,然后为每次迭代(几毫秒)创建另一个数组 - 一遍又一遍。我正在寻找一种将数据包中的字节数直接传递到 websocket 的方法,或者如果不可能,则从InputStream
中提取(使用流read
方法)特定的数据包大小。有可能吗?
但是为什么需要巨大的缓冲区呢?将其设为 200_000 并在 OutputStream#write 中将所有可用字节复制到新字节数组并发送到 Web 套接字。
【参考方案1】:
我是 Jaffree 的作者。
首先,你不需要自定义StdReader
,你可以使用PipeOutput
:
var outputStream = new OutputStreamWebSocketWrapper(...);
FFmpeg.atPath()
.addInput(urlInput.fromUrl("path/to/file.mp4"))
.addOutput(
PipeOutput.pumpTo(outputStream)
.setFormat("flv")
)
.execute();
上面的OutputStreamWebSocketWrapper
应该是简单的OutputStream
实现,它将字节发送到WebSocket。
我的问题是我不知道流中有多少字节,而 JSMpeg 需要特定的数字(帧/数据包)才能正确显示流而不会受到任何干扰。所以我人为地创建了巨大的字节数组来处理最大的结果,但是我的记忆力受到了影响。
您的 HTTP Server 和 OutputStream
似乎有问题。默认情况下,一些服务器(几乎是我所知道的)首先从 OutputStream
读取 all 数据,然后才开始向客户端发送数据。他们这样做是因为 content-length
标头。但是可以指示一些服务器实时流式传输数据。在这种情况下,客户端不会得到content-length
(除非您自己指定值),因此不知道剩下多少数据。
您提到了 Spring,可能您也将它用于 WebSockets。 Spring 在 WebSockets 上使用 STOMP 协议。 WebSockets 几乎就像 TCP Sockets:它们允许您发送任意字节。 STOMP 类似于 HTTP。因此可以通过 WebSocket 流式传输字节,但无需 STOMP。
我还想提出一个替代解决方案:如果内存不适合 - 将其保存在磁盘上。您没有明确说明,但我怀疑您尝试将视频文件即时转换为 MJPEG 流并将其发送到客户端。无论如何,这种转换是 CPU 密集型的,并且可能会导致大量并发连接出现问题。此外,在动态内存中,您不能重用之前转换的数据。此外,WebSocket 不能像 HTTP 响应那样简单地缓存。
因此,您可以采用以下两种方式之一:
-
按请求编码(但一次):检查是否已经创建了 MJPEG,如果没有创建并提供它
文件上传后立即编码 MJPEG。将 MJPEG 作为通用文件提供
** 更新 **
在 cmets 讨论后,我建议以下 sn-p:
OutputStream websocketOutput = new OutputStream()
@Override
public void write(int b) throws IOException
// not implemented
@Override
public void write(byte[] bytes, int offset, int length) throws IOException
byte[] data = new byte[length];
System.arraycopy(bytes, offset, data, 0, length);
socket.sendMessage(data);
;
FFmpeg.atPath()
.addInput(UrlInput.fromUrl("rtsp://servername/streamname"))
.addOutput(
PipeOutput.pumpTo(websocketOutput, 200_000)
.setFormat("mpegts")
.setCodec(StreamType.VIDEO, "mpeg1video")
// other arguments here
)
.execute();
所以你不需要巨大的缓冲区,只需在每次迭代时将你得到的所有有意义的字节都传递给 WebSocket。
【讨论】:
PipeOutput
不幸的是没有按预期工作。我已经编辑了这个问题并提供了更多的澄清和解释。让我知道是否有办法实现我想要的。太感谢了。 :)
我已经添加了代码sn-p以上是关于将标准输出字节从 Process 转发到 WebSocket 或读取当前 InputStream 长度的主要内容,如果未能解决你的问题,请参考以下文章
perl open:如何将标准错误转发到标准输出? [复制]