如何将数据从 Python 异步套接字服务器发送到子进程?

Posted

技术标签:

【中文标题】如何将数据从 Python 异步套接字服务器发送到子进程?【英文标题】:How to send data from a Python asyncio socket server to a subprocess? 【发布时间】:2018-07-29 08:20:51 【问题描述】:

Python 3.6

这个程序:

    作为子进程启动 ffmpeg 等待套接字连接 在套接字上接收 PNG 图像 将 PNG 图像发送到 ffmpeg 标准输入

问题是第4步。我不知道如何将接收到的PNG图像从协程发送到ffmpeg子进程的stdin。谁能指出我正确的方向将PNG图像发送到ffmpeg子进程的标准输入?

编辑:澄清一下 - 这段代码没有任何问题,它通过套接字很好地接收 PNG。我只是不知道如何将 PNG 发送到 ffmpeg 的标准输入中。我已经完成了很多 Python,但 asyncio 对我来说是新的,事物如何联系在一起是一个谜。

谢谢!

import asyncio
import argparse, sys
import sys
import base64
from struct import unpack

parser = argparse.ArgumentParser()
parser.add_argument('--port', help='ffmpeg listen port')
parser.add_argument('--outputfilename', help='ffmpeg output filename')
args = parser.parse_args()
if not args.port:
    print("port is required")
    sys.exit(1)
if not args.outputfilename:
    print("outputfilename is required")
    sys.exit(1)

async def _read_stream(stream, cb):
    while True:
        line = await stream.readline()
        if line:
            cb(line)
        else:
            break

async def _stream_subprocess(cmd, stdout_cb, stderr_cb):
    process = await asyncio.create_subprocess_exec(
        *cmd,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
        stdin=asyncio.subprocess.PIPE,
    )

    await asyncio.wait([
        _read_stream(process.stdout, stdout_cb),
        _read_stream(process.stderr, stderr_cb)
    ])
    return await process.wait()


def process_stderr(line):
    # ffmpeg finishes processing and writes the output file when its input is closed
    # thus the completion message will come out of stderr only when the socket or stdin or whatever is closed
    line = line.decode()
    print(line)
    if "Output" in line:
        if args.outputfilename in line:
            print('finished!!!!')
            sys.exit(0)

def process_stdout(line):
    print("STDOUT: %s" % line)

def spawn_ffmpeg(listenport, outputfilename, framerate=30, format='webm'):
    outputdirectory = "sftp://username:password@10.0.0.196/var/www/static/"
    input_type = "pipe:0" #stdin

    params = \
        f"ffmpeg  " \
        f"-loglevel 56 " \
        f"-y -framerate framerate " \
        f"-f image2pipe " \
        f"-i input_type " \
        f"-c:v libvpx-vp9 " \
        f"-b:v 1024k " \
        f"-q:v 0 " \
        f"-pix_fmt yuva420p " \
        f"outputdirectoryoutputfilename "

    return params


async def socket_png_receiver(reader, writer):
    while True:
        # first the client sends the length of the data to us
        lengthbuf = await reader.read(4)
        length, = unpack('!I', lengthbuf)
        if length == 0:
            print("length was 0, finish") # a zero length PNG says that there are no more frames
            break
        # then we read the PNG
        data = await reader.read(length)
        data = data.decode() # from bytes to string
        png_bytes = base64.b64decode(data) # from base64 to bytes
        # next line was just a guess, so I have commented it out.
        #await proc.communicate(png_bytes)
        print("Got PNG, length", length)
    return


loop = asyncio.get_event_loop()
command = spawn_ffmpeg("24897", args.outputfilename)
ffmpeg_process = _stream_subprocess(
    command.split(),
    process_stdout,
    process_stderr,
)
#coro = asyncio.start_server(socket_png_receiver, '0.0.0.0', args.port, ffmpeg_process, loop=loop)
coro = asyncio.start_server(socket_png_receiver, '0.0.0.0', args.port, loop=loop)
several_futures = asyncio.gather(ffmpeg_process, coro)
server = loop.run_until_complete(several_futures)
server.close()
loop.close()

以下是@user4815162342 建议的更改

import asyncio
import argparse, sys
import sys
import base64
from struct import unpack

parser = argparse.ArgumentParser()
parser.add_argument('--port', help='ffmpeg listen port')
parser.add_argument('--outputfilename', help='ffmpeg output filename')
args = parser.parse_args()
if not args.port:
    print("port is required")
    sys.exit(1)
if not args.outputfilename:
    print("outputfilename is required")
    sys.exit(1)
if not args.outputfilename.endswith('.webm'):
    print("outputfilename must end with '.webm'")
    sys.exit(1)

async def _read_stream(stream, cb):
    while True:
        line = await stream.readline()
        if line:
            cb(line)
        else:
            break


async def _stream_subprocess(cmd, stdout_cb, stderr_cb):
    global process
    process = await asyncio.create_subprocess_exec(
        *cmd,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
        stdin=asyncio.subprocess.PIPE,
    )

    await asyncio.wait([
        _read_stream(process.stdout, stdout_cb),
        _read_stream(process.stderr, stderr_cb)
    ])
    return await process.wait()


def process_stderr(line):
    # ffmpeg finishes processing and writes the output file when its input is closed
    # thus the completion message will come out of stderr only when the socket or stdin or whatever is closed
    line = line.decode()
    print(line)
    if "Output" in line:
        if args.outputfilename in line:
            print('finished!!!!')
            sys.exit(0)


def process_stdout(line):
    print("STDOUT: %s" % line)


def spawn_ffmpeg(listenport, outputfilename, framerate=30, format='webm'):
    outputdirectory = "sftp://username:password@10.0.0.196/var/www/static/"
    input_type = "pipe:0"  # stdin

    params = \
        f"ffmpeg  " \
        f"-loglevel 56 " \
        f"-y " \
        f"-framerate framerate " \
        f"-i input_type " \
        f"outputdirectoryoutputfilename "

    print(params)
    return params


async def socket_png_receiver(reader, writer):
    while True:
        # first the client sends the length of the data to us
        lengthbuf = await reader.readexactly(4)
        length, = unpack('!I', lengthbuf)
        if length == 0:
            print("length was 0, finish")  # a zero length PNG says that there are no more frames
            break
        # then we read the PNG
        print("Got PNG, length", length)
        data = await reader.readexactly(length)
        print(data)
        png_bytes = base64.b64decode(data)  # from base64 to bytes
        process.stdin.write(png_bytes)
    return


loop = asyncio.get_event_loop()
command = spawn_ffmpeg("24897", args.outputfilename)
ffmpeg_process = _stream_subprocess(
    command.split(),
    process_stdout,
    process_stderr,
)
coro = asyncio.start_server(socket_png_receiver, '0.0.0.0', args.port, loop=loop)
several_futures = asyncio.gather(ffmpeg_process, coro)
server = loop.run_until_complete(several_futures)
server.close()
loop.close()

【问题讨论】:

这个问题没有具体说明提供的实现到底出了什么问题,但也许您应该尝试使用以下方式发送数据:proc.stdin.write(png_bytes),后跟proc.stdin.close() 另外,await reader.read(length) 应该是 await reader.readexactly(length)StreamReader.read 的参数是最大,而不是要读取的确切字节数。 另一个可疑的事情是socket_png_receiver 似乎期望它会继续向相同的 ffmpeg 进程发送数据。这意味着 ffmpeg 必须能够在其标准输入上处理多个数据块,即它需要有一个分隔协议,就像您通过在数据之前打包长度来实现的那样。我怀疑您应该改为在 socket_png_receiver 的循环的每次迭代中启动一个新的 ffmpeg 进程。 @user4815162342 我已经更新了代码 - 以 coro 开头的行有一个错误,所以我已经更正它并将旧行注释掉了。我还注释掉了 await proc.communicate(png_bytes) ,因为这只不过是关于如何实现我的目标的巫毒猜测,只是导致了一个错误。因此,代码实际上工作正常,只是没有将数据发送到 ffmpeg 标准输入的机制,因为我根本不知道如何。 @user4815162342 你是对的 - socket_png_receiver 只会发送到一个 ffmpeg 进程。这个想法是,一旦 PNG 图像序列完成,那么 ffmpeg 和包装它的程序都会退出。 【参考方案1】:

代码有几个问题:

await reader.read(length) 应该是await reader.readexactly(length),因为StreamReader.read 的参数是要读取的最大字节数,它可以返回更少的字节数。

proc.communicate(png_bytes) 应更改为 proc.stdin.write(png_bytes)。此处对communicate() 的调用不正确,因为您想继续与程序对话,而communicate() 等待所有流关闭。

asyncio.create_subprocess_exec(...) 返回的进程实例必须对socket_png_receiver 可用,例如通过使用global process 使process 变量成为全局变量。 (最好使用一个类并分配给self.process,但这超出了这个答案的范围。)

一些潜在问题:

不需要将data从字节解码为字符串,base64.b64decode可以接受字节就好了。

spawn_ffmpeg() 似乎没有使用其listenport 参数。

【讨论】:

确实它工作得很好,谢谢/正是我想要的......产生了彼此和父母很好地隔离的视频处理任务。

以上是关于如何将数据从 Python 异步套接字服务器发送到子进程?的主要内容,如果未能解决你的问题,请参考以下文章

将数据从套接字从C ++服务器发送到Python客户端时出现问题

计算从客户端发送到服务器的数据包数量?

如何将数据从 Web 套接字服务器发送到客户端?

如何将数据从 Android 中的数据报套接字发送到 Node js 服务器?

Python 3套接字客户端发送数据和C++套接字服务器接收偏移数据?

使用异步套接字编程发送二进制数据并读取其值