ffmpeg中的多个命名管道

Posted

技术标签:

【中文标题】ffmpeg中的多个命名管道【英文标题】:Multiple named pipes in ffmpeg 【发布时间】:2021-05-04 16:23:52 【问题描述】:

这个问题是this question的后续问题

在我的应用程序中,我想修改各种 mp3,然后将它们混合在一起。我知道我可以在 FFmpeg 中使用单个命令行来完成它,但它最终会变得非常混乱,因为我需要在每个样本上使用各种过滤器并且我有五个。我的想法是单独编辑每个样本,将它们保存到管道中,最后混合它们。

import subprocess
import os

def create_pipes():
    os.mkfifo("pipe1")
    os.mkfifo("pipe2")

    
def create_samp():   
    sample= subprocess.run(["ffmpeg", "-i", "https://freesound.org/data/previews/186/186942_2594536-hq.mp3", \
                            "-af", "adelay=15000|15000", "-f", "mp3", "pipe:pipe1"], stdout=subprocess.PIPE).stdout
    return(sample)

def create_samp_2():   
    sample= subprocess.run(["ffmpeg", "-i", "https://freesound.org/data/previews/370/370934_6399962-lq.ogg", \
                            "-af", "adelay=1000|1000", "-f", "mp3", "pipe:pipe2"], stdout=subprocess.PIPE).stdout
    return(sample)


def record(samp, samp_2):  
    process = subprocess.Popen(["ffmpeg", "-y", '-f', 'mp3', \
                                "-i", "https://cdns-preview-b.dzcdn.net/stream/c-b0b684fe962f93dc43f1f7ea493683a1-3.mp3", \
                                "-i", "pipe1", \
                                "-i", "pipe2", \
                                "-filter_complex", "amix=inputs=3:duration=longest", "output.mp3"], stdin=subprocess.PIPE)

    process.stdin.write(samp)  
    process.stdin.write(samp_2)
    process.stdin.close()  
    process.wait()

create_pipes()
samp = create_samp()
samp_2 = create_samp_2()
record(samp, samp_2)

当我运行脚本时,create_samp()create_samp2() 运行良好。但是我运行record(),程序卡住了,没有错误消息,所以我不知道是什么问题。

【问题讨论】:

"程序卡住,没有错误信息" ffmpeg 输出错误吗? ffmpeg 会被执行还是在此之前脚本会失败?是您的代码有问题,还是 ffmpeg 命令有问题,或者两者都有?在尝试将命令实现到代码中之前,您是否手动尝试过 ffmpeg? ffmpeg 运行前两个命令,我得到Output #0, mp3, to 'pipe:pipe1'Output #0, mp3, to 'pipe:pipe2' 但是当我运行record() 时,ffmpeg 卡在进程中间而没有错误消息。 【参考方案1】:

使用命名管道(仅限 Linux):

当有两个或更多输入流(需要从内存缓冲区进行管道传输)时,需要命名管道。使用命名管道一点也不简单......

从 FFmpeg 的角度来看,命名管道就像(不可搜索的)输入文件。

在 Python 中使用命名管道(在 Linux 中): 假设pipe1 是“命名管道”的名称(例如pipe1 = "audio_pipe1")。

    创建一个“命名管道”:

    os.mkfifo(pipe1)
    

    以“只写”文件的形式打开管道:

    fd_pipe = os.open(pipe_name, os.O_WRONLY)  # fd_pipe1 is a file descriptor (an integer).
    

    将数据以小块的形式写入管道。 根据this 的帖子,大多数 Linux 系统中管道的默认缓冲区大小为 64KBytes。 由于数据大于 65536 字节,我们需要将数据分小块写入管道。 我决定使用 1024 字节的任意块大小。 管道写入操作是一种“阻塞”操作。 我通过使用“作家”线程解决了它:

    def writer(data, pipe_name, chunk_size):
        # Open the pipes as opening "low level IO" files (open for "open for writing only").
        fd_pipe = os.open(pipe_name, os.O_WRONLY)  # fd_pipe1 is a file descriptor (an integer)
    
        for i in range(0, len(data), chunk_size):
            # Write to named pipe as writing to a "low level IO" file (but write the data in small chunks).
            os.write(fd_pipe, data[i:chunk_size+i])  # Write 1024 bytes of data to fd_pipe
    

    关闭管道:

    os.close(fd_pipe)
    

    删除(取消链接)命名管道:

    os.unlink(pipe1)
    

这是来自previous post 的示例,使用两个命名管道:

import subprocess
import os
from threading import Thread


def create_samp():
    # Read audio stream from https://freesound.org/data/previews/186/186942_2594536-hq.mp3
    # Apply adelay audio filter.
    # Encode the audio in mp3 format.
    # FFmpeg output is passed to stdout pipe, and stored in sample bytes array.
    sample1 = subprocess.run(["ffmpeg", "-i", "https://freesound.org/data/previews/186/186942_2594536-hq.mp3",
                              "-af", "adelay=15000|15000", "-f", "mp3", "pipe:"], stdout=subprocess.PIPE).stdout

    # Read second audio sample from https://cdns-preview-b.dzcdn.net/stream/c-b0b684fe962f93dc43f1f7ea493683a1-3.mp3
    sample2 = subprocess.run(["ffmpeg", "-i", "https://cdns-preview-b.dzcdn.net/stream/c-b0b684fe962f93dc43f1f7ea493683a1-3.mp3",
                              "-f", "mp3", "pipe:"], stdout=subprocess.PIPE).stdout

    return sample1, sample2


def writer(data, pipe_name, chunk_size):
    # Open the pipes as opening files (open for "open for writing only").
    fd_pipe = os.open(pipe_name, os.O_WRONLY)  # fd_pipe1 is a file descriptor (an integer)

    for i in range(0, len(data), chunk_size):
        # Write to named pipe as writing to a file (but write the data in small chunks).
        os.write(fd_pipe, data[i:chunk_size+i])  # Write 1024 bytes of data to fd_pipe

    # Closing the pipes as closing files.
    os.close(fd_pipe)


def record(samp1, samp2):
    # Names of the "Named pipes"
    pipe1 = "audio_pipe1"
    pipe2 = "audio_pipe2"

    # Create "named pipes".
    os.mkfifo(pipe1)
    os.mkfifo(pipe2)

    # Open FFmpeg as sub-process
    # Use two audio input streams:
    # 1. Named pipe: "audio_pipe1"
    # 2. Named pipe: "audio_pipe2"
    # Merge the two audio streams using amix audio filter.
    # Store the result to output file: output.mp3
    process = subprocess.Popen(["ffmpeg", "-y", '-f', 'mp3',
                                "-i", pipe1,
                                "-i", pipe2,
                                "-filter_complex", "amix=inputs=2:duration=longest", "output.mp3"],
                                stdin=subprocess.PIPE)

    # Initialize two "writer" threads (each writer writes data to named pipe in chunks of 1024 bytes).
    thread1 = Thread(target=writer, args=(samp1, pipe1, 1024))  # thread1 writes samp1 to pipe1
    thread2 = Thread(target=writer, args=(samp2, pipe2, 1024))  # thread2 writes samp2 to pipe2

    # Start the two threads
    thread1.start()
    thread2.start()

    # Wait for the two writer threads to finish
    thread1.join()
    thread2.join()

    process.wait()  # Wait for FFmpeg sub-process to finish

    # Remove the "named pipes".
    os.unlink(pipe1)
    os.unlink(pipe2)


sampl1, sampl2 = create_samp()
record(sampl1, sampl2)

更新:

使用类的相同解决方案: 使用类(“NamedPipeWriter”类)实现解决方案会更优雅一些。 该类继承Thread类,并重写run方法。

您可以创建多个对象的列表,并在循环中迭代它们(而不是为每个新输入流复制代码)。

这是使用类的相同解决方案:

import subprocess
import os
import stat
from threading import Thread


def create_samp():
    # Read audio stream from https://freesound.org/data/previews/186/186942_2594536-hq.mp3
    # Apply adelay audio filter.
    # Encode the audio in mp3 format.
    # FFmpeg output is passed to stdout pipe, and stored in sample bytes array.
    sample1 = subprocess.run(["ffmpeg", "-i", "https://freesound.org/data/previews/186/186942_2594536-hq.mp3",
                              "-af", "adelay=15000|15000", "-f", "mp3", "pipe:"], stdout=subprocess.PIPE).stdout

    # Read second audio sample from https://cdns-preview-b.dzcdn.net/stream/c-b0b684fe962f93dc43f1f7ea493683a1-3.mp3
    sample2 = subprocess.run(["ffmpeg", "-i", "https://cdns-preview-b.dzcdn.net/stream/c-b0b684fe962f93dc43f1f7ea493683a1-3.mp3",
                              "-f", "mp3", "pipe:"], stdout=subprocess.PIPE).stdout

    return sample1, sample2


class NamedPipeWriter(Thread):
    """ Write data (in small chunks) to a named pipe using a thread """

    def __init__(self, pipe_name, data):
        """ Initialization - get pipe name and data to be written """
        super().__init__()
        self._pipe_name = pipe_name
        self._chunk_size = 1024
        self._data = data
        

    def run(self):
        """ Open the pipe, write data in small chunks and close the pipe """
        chunk_size = self._chunk_size
        data = self._data

        # Open the pipes as opening files (open for "open for writing only").
        fd_pipe = os.open(self._pipe_name, os.O_WRONLY)  # fd_pipe1 is a file descriptor (an integer)

        for i in range(0, len(data), chunk_size):
            # Write to named pipe as writing to a file (but write the data in small chunks).
            os.write(fd_pipe, data[i:chunk_size+i])  # Write 1024 bytes of data to fd_pipe

        # Closing the pipes as closing files.
        os.close(fd_pipe)


    

def record(samp1, samp2):
    # Names of the "Named pipes"
    pipe1 = "audio_pipe1"
    pipe2 = "audio_pipe2"

    # Create "named pipes".
    if not stat.S_ISFIFO(os.stat(pipe1).st_mode):
        os.mkfifo(pipe1)  # Create the pipe only if not exist.

    if not stat.S_ISFIFO(os.stat(pipe2).st_mode):
        os.mkfifo(pipe2)

    # Open FFmpeg as sub-process
    # Use two audio input streams:
    # 1. Named pipe: "audio_pipe1"
    # 2. Named pipe: "audio_pipe2"
    # Merge the two audio streams using amix audio filter.
    # Store the result to output file: output.mp3
    process = subprocess.Popen(["ffmpeg", "-y", '-f', 'mp3',
                                "-i", pipe1,
                                "-i", pipe2,
                                "-filter_complex", "amix=inputs=2:duration=longest", "output.mp3"],
                                stdin=subprocess.PIPE)

    # Initialize two "writer" threads (each writer writes data to named pipe in chunks of 1024 bytes).
    named_pipe_writer1 = NamedPipeWriter(pipe1, samp1)
    named_pipe_writer2 = NamedPipeWriter(pipe2, samp2)

    # Start the two threads
    named_pipe_writer1.start()
    named_pipe_writer2.start()

    # Wait for the two writer threads to finish
    named_pipe_writer1.join()
    named_pipe_writer1.join()

    process.wait()  # Wait for FFmpeg sub-process to finish

    # Remove the "named pipes".
    os.unlink(pipe1)
    os.unlink(pipe2)


sampl1, sampl2 = create_samp()
record(sampl1, sampl2)

注意事项:

代码在 Ubuntu 18.04 中测试(在虚拟机中)。

【讨论】:

NOR 工作。停在行: fd_pipe = os.open(self._pipe_name, os.O_WRONLY) 该命令适用于“低级”操作系统操作,因此如果它不起作用,可能与您的操作系统配置或权限有关。我真的说不出来……

以上是关于ffmpeg中的多个命名管道的主要内容,如果未能解决你的问题,请参考以下文章

将 ffmpeg 输出通过管道传输到命名管道

使用命名管道使用 FFMPEG 记录 RTP VP8 数据包

如何避免命名管道中的多个作者?

将 Windows 命名管道用于 IPC 的一种有效方法

可能的竞争条件,来自多个 tee 接收者的管道输出在 BASH 脚本中的命名管道上无序到达

如何避免并行命名管道的死锁?