将标准输出字节从 Process 转发到 WebSocket 或读取当前 InputStream 长度

Posted

技术标签:

【中文标题】将标准输出字节从 Process 转发到 WebSocket 或读取当前 InputStream 长度【英文标题】:Forward stdout bytes from Process to WebSocket or read current InputStream length 【发布时间】:2022-01-08 23:27:34 【问题描述】:

我正在使用Jaffree 库通过 FFmpeg 转换和显示我的 RTSP 流。

我正在做与 node-rtsp-stream 库完全相同的事情,但使用 Java 而不是 Node.js

    使用正确的参数和标准输出运行 ffmpeg。 捕获 stdOut 发出的数据包/字节。 使用 WebSocket 将它们发送给客户端。

我在客户端使用JSMpeg 来显示流 - 情况与 node-rtsp-stream 中的情况完全相同。

一切正常,但内存优化和消耗存在问题。

Jaffree 库中处理该过程的一段代码:

protected Executor startExecution(final Process process,
                                  final AtomicReference<T> resultReference) 
    Executor executor = new Executor(contextName);

    if (stdOutReader != null) 
        executor.execute("StdOut", new Runnable() 
            @Override
            public void run() 
                T result = stdOutReader.read(process.getInputStream());
                if (result != null) 
                    boolean set = resultReference.compareAndSet(null, result);
                    if (!set) 
                        LOGGER.warn("Ignored result of reading STD OUT: ", result);
                    
                
            
        );
    

我的 StdOutReader 实现:

private static final int BUFFER_SIZE = 2048000;

@Override
public T read(final InputStream stream) 
  Try.run(() -> 
    final byte[] buffer = new byte[BUFFER_SIZE];
    int length;

    while ((length = stream.read(buffer)) >= 0) 
      final byte[] finalBuffer = new byte[length];
      System.arraycopy(buffer, 0, finalBuffer, 0, length);
      socket.sendMessage(finalBuffer);
    
  ).onFailure(throwable -> 
    throw new RuntimeException("FFmpeg error", throwable);
  );

  return null;

在底层有一个简单的Process,它返回InputStream(实际上是BufferedInputStream,这非常重要)。

我的问题是我不知道流中有多少字节,JSMpeg 需要特定的数字(帧/数据包)才能正确显示流而不会受到任何干扰。 所以我人为地创建了巨大的字节数组来处理最大的结果,但是我的记忆力受到了影响。

我只想使用 WebSocket 转发每个数据包。所以它应该是这样工作的(现在也是这样工作的):

StdOut sends packet -&gt; passing packet to WebSocket -&gt; StdOut sends packet -&gt; passing packet to WebSocket -&gt; ...

另一种选择是以某种方式预测InputStream此时持有的字节数,但这对于BufferedInputStream可能是不可能的。

    我尝试使用 Reader 读取字符串行并将它们转换为字节,但它们返回的大小错误 - 这不起作用。 我尝试在 ffmpeg 上设置一些缓冲区/长度大小的数据包 - 这不起作用或流中断。 我注意到这些数据包是 188 字节的乘积,但即使将数据包转换为相似大小也会导致流中断(它们可能需要直接从 stdOut 获取大小)。 我试过readAllBytes(),但循环结束了。 我用transferToByteArrayOutputStream 尝试了一些方法 - 没有任何帮助。 我已尝试检查 available() 的大小 InputStream,但这也不起作用。 我尝试使用 Spring 5 WebFlux DataBufferStream 转换为 Flux 并订阅,但我只能读取单个字节(或者我可能不知道如何读取整个当前缓冲区)。

也许有可能修改 JSMpeg 以接收静态大小的数据包(例如,我的意思总是 2048 字节) - 如果是这样,请告诉我。

或者我可以以某种方式从发送到 stdOut 的特定字节预测数据包的长度 - 如果是这样,请告诉我。

我怎样才能做到这一点,甚至有可能吗?

** 第一版 **

根据 Denis Kokorin 的回答,我已经使用 PipeOutput 实现了它,但它具有完全相同的限制。在后台PipeOutput 只是将流内容复制到new byte[],并使用预定义的缓冲区大小进行初始化。所以我得到这样的输出:

Array length: 262144, actual length: 9588
Array length: 262144, actual length: 4700
Array length: 262144, actual length: 9024
Array length: 262144, actual length: 10152
Array length: 262144, actual length: 18048

我需要发送具有实际长度的数组(使用 WebSocket)而不是数组长度。原因是JSMpeg 库的工作方式:

视频和音频的内部缓冲区非常小(512kb 和 分别为 128kb)和 JSMpeg 将丢弃旧的(甚至未播放的)数据 为新到达的数据腾出空间而没有太多的模糊性。这可以 当网络拥塞时引入解码伪影,但是 确保将延迟保持在最低限度。

JSMpeg 的默认缓冲区大小为 524 288,这样就不会显示任何内容。

我很确定ChannelOutput 的情况会非常相似。如果没有,请告诉我如何使用 WebSocket 从SeekableByteChannel 传递数据。

您提到了 Spring,可能您也将它用于 WebSockets。 Spring 在 WebSockets 上使用 STOMP 协议。 WebSockets 几乎就像 TCP 套接字:它们允许您发送任意字节。 STOMP 类似 到 HTTP。因此可以通过 WebSocket 流式传输字节,但没有 跺脚。

是的,我没有直接提到它。我确实在使用带有 spring-boot-starter-websocket 包的 Spring 5。在我的特定情况下,我需要使用 STOMP,但我使用普通 Java 函数发送这些字节 - 我将 byte[] 作为参数 (SimpMessagingTemplate.convertAndSend) 传递。

我还想提出一个替代解决方案:如果有什么 不适合内存 - 将其保存在磁盘上。

这可能不是我的选择。这是直播,不是标准视频,我必须尽可能降低延迟。

文件上传后立即编码 MJPEG。

它不是 MJPEG 格式 - 它是 mpegts。 JSMpeg 是库的名称。 :)

ffmpeg 命令如下所示:

ffmpeg -loglevel level+info -i rtsp://test:test@10.10.10.10:554/stream -n -rtsp_transport tcp -f mpegts -codec:v mpeg1video -r 30 -s 480x320 -b:v 1500k -bf 0 -an -vf scale=480:320 -

【问题讨论】:

检查 PipeOutput.pumpTo(final OutputStream 目标,final int bufferSize)。您可以配置缓冲区大小。 @DenisKokorin 问题是bufferSize 是一个静态值。我不知道数据包长度是 9588、18048、108288 还是 4700。我想以某种方式预测/提取特定数据包的长度。例如,如果我将 bufferSize 初始化为 20000,则其余字节将作为零发送,这也会中断流式传输。 OutputStream 有方法 writeBytes(byte[] bytes, int offset, int length)。您可以使用 Arrays.copy() 填充大小长度的字节数组。 没错,但请注意,这正是我在上述代码中StdOutReader 实现(read 方法)中所做的。我需要创建一个巨大的缓冲区大小,然后为每次迭代(几毫秒)创建另一个数组 - 一遍又一遍。我正在寻找一种将数据包中的字节数直接传递到 websocket 的方法,或者如果不可能,则从InputStream 中提取(使用流read 方法)特定的数据包大小。有可能吗? 但是为什么需要巨大的缓冲区呢?将其设为 200_000 并在 OutputStream#write 中将所有可用字节复制到新字节数组并发送到 Web 套接字。 【参考方案1】:

我是 Jaffree 的作者。

首先,你不需要自定义StdReader,你可以使用PipeOutput

var outputStream = new OutputStreamWebSocketWrapper(...);

FFmpeg.atPath()
  .addInput(urlInput.fromUrl("path/to/file.mp4"))
  .addOutput(
      PipeOutput.pumpTo(outputStream)
        .setFormat("flv")
  )
  .execute();

上面的OutputStreamWebSocketWrapper 应该是简单的OutputStream 实现,它将字节发送到WebSocket。

我的问题是我不知道流中有多少字节,而 JSMpeg 需要特定的数字(帧/数据包)才能正确显示流而不会受到任何干扰。所以我人为地创建了巨大的字节数组来处理最大的结果,但是我的记忆力受到了影响。

您的 HTTP Server 和 OutputStream 似乎有问题。默认情况下,一些服务器(几乎是我所知道的)首先从 OutputStream 读取 all 数据,然后才开始向客户端发送数据。他们这样做是因为 content-length 标头。但是可以指示一些服务器实时流式传输数据。在这种情况下,客户端不会得到content-length(除非您自己指定值),因此不知道剩下多少数据。

您提到了 Spring,可能您也将它用于 WebSockets。 Spring 在 WebSockets 上使用 STOMP 协议。 WebSockets 几乎就像 TCP Sockets:它们允许您发送任意字节。 STOMP 类似于 HTTP。因此可以通过 WebSocket 流式传输字节,但无需 STOMP。

我还想提出一个替代解决方案:如果内存不适合 - 将其保存在磁盘上。您没有明确说明,但我怀疑您尝试将视频文件即时转换为 MJPEG 流并将其发送到客户端。无论如何,这种转换是 CPU 密集型的,并且可能会导致大量并发连接出现问题。此外,在动态内存中,您不能重用之前转换的数据。此外,WebSocket 不能像 HTTP 响应那样简单地缓存。

因此,您可以采用以下两种方式之一:

    按请求编码(但一次):检查是否已经创建了 MJPEG,如果没有创建并提供它 文件上传后立即编码 MJPEG。将 MJPEG 作为通用文件提供

** 更新 **

在 cmets 讨论后,我建议以下 sn-p:

OutputStream websocketOutput = new OutputStream()             
    @Override
    public void write(int b) throws IOException 
        // not implemented
    
    @Override
    public void write(byte[] bytes, int offset, int length) throws IOException 
        byte[] data = new byte[length];
        System.arraycopy(bytes, offset, data, 0, length);
        socket.sendMessage(data);
    
;
FFmpeg.atPath()
        .addInput(UrlInput.fromUrl("rtsp://servername/streamname"))
        .addOutput(
                PipeOutput.pumpTo(websocketOutput, 200_000)
                        .setFormat("mpegts")
                        .setCodec(StreamType.VIDEO, "mpeg1video")
                // other arguments here
        )
        .execute();

所以你不需要巨大的缓冲区,只需在每次迭代时将你得到的所有有意义的字节都传递给 WebSocket。

【讨论】:

PipeOutput 不幸的是没有按预期工作。我已经编辑了这个问题并提供了更多的澄清和解释。让我知道是否有办法实现我想要的。太感谢了。 :) 我已经添加了代码sn-p

以上是关于将标准输出字节从 Process 转发到 WebSocket 或读取当前 InputStream 长度的主要内容,如果未能解决你的问题,请参考以下文章

SharpPcap - 从标准输出捕获

perl open:如何将标准错误转发到标准输出? [复制]

Node.js child_process exec 的标准输出被缩短

标准输入 C++

如何获取子进程的输出

输入输出流