如何将数据从 Python 异步套接字服务器发送到子进程?
Posted
技术标签:
【中文标题】如何将数据从 Python 异步套接字服务器发送到子进程?【英文标题】:How to send data from a Python asyncio socket server to a subprocess? 【发布时间】:2018-07-29 08:20:51 【问题描述】:Python 3.6
这个程序:
-
作为子进程启动 ffmpeg
等待套接字连接
在套接字上接收 PNG 图像
将 PNG 图像发送到 ffmpeg
标准输入
问题是第4步。我不知道如何将接收到的PNG图像从协程发送到ffmpeg子进程的stdin。谁能指出我正确的方向将PNG图像发送到ffmpeg子进程的标准输入?
编辑:澄清一下 - 这段代码没有任何问题,它通过套接字很好地接收 PNG。我只是不知道如何将 PNG 发送到 ffmpeg 的标准输入中。我已经完成了很多 Python,但 asyncio 对我来说是新的,事物如何联系在一起是一个谜。
谢谢!
import asyncio
import argparse, sys
import sys
import base64
from struct import unpack
parser = argparse.ArgumentParser()
parser.add_argument('--port', help='ffmpeg listen port')
parser.add_argument('--outputfilename', help='ffmpeg output filename')
args = parser.parse_args()
if not args.port:
print("port is required")
sys.exit(1)
if not args.outputfilename:
print("outputfilename is required")
sys.exit(1)
async def _read_stream(stream, cb):
while True:
line = await stream.readline()
if line:
cb(line)
else:
break
async def _stream_subprocess(cmd, stdout_cb, stderr_cb):
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.PIPE,
)
await asyncio.wait([
_read_stream(process.stdout, stdout_cb),
_read_stream(process.stderr, stderr_cb)
])
return await process.wait()
def process_stderr(line):
# ffmpeg finishes processing and writes the output file when its input is closed
# thus the completion message will come out of stderr only when the socket or stdin or whatever is closed
line = line.decode()
print(line)
if "Output" in line:
if args.outputfilename in line:
print('finished!!!!')
sys.exit(0)
def process_stdout(line):
print("STDOUT: %s" % line)
def spawn_ffmpeg(listenport, outputfilename, framerate=30, format='webm'):
outputdirectory = "sftp://username:password@10.0.0.196/var/www/static/"
input_type = "pipe:0" #stdin
params = \
f"ffmpeg " \
f"-loglevel 56 " \
f"-y -framerate framerate " \
f"-f image2pipe " \
f"-i input_type " \
f"-c:v libvpx-vp9 " \
f"-b:v 1024k " \
f"-q:v 0 " \
f"-pix_fmt yuva420p " \
f"outputdirectoryoutputfilename "
return params
async def socket_png_receiver(reader, writer):
while True:
# first the client sends the length of the data to us
lengthbuf = await reader.read(4)
length, = unpack('!I', lengthbuf)
if length == 0:
print("length was 0, finish") # a zero length PNG says that there are no more frames
break
# then we read the PNG
data = await reader.read(length)
data = data.decode() # from bytes to string
png_bytes = base64.b64decode(data) # from base64 to bytes
# next line was just a guess, so I have commented it out.
#await proc.communicate(png_bytes)
print("Got PNG, length", length)
return
loop = asyncio.get_event_loop()
command = spawn_ffmpeg("24897", args.outputfilename)
ffmpeg_process = _stream_subprocess(
command.split(),
process_stdout,
process_stderr,
)
#coro = asyncio.start_server(socket_png_receiver, '0.0.0.0', args.port, ffmpeg_process, loop=loop)
coro = asyncio.start_server(socket_png_receiver, '0.0.0.0', args.port, loop=loop)
several_futures = asyncio.gather(ffmpeg_process, coro)
server = loop.run_until_complete(several_futures)
server.close()
loop.close()
以下是@user4815162342 建议的更改
import asyncio
import argparse, sys
import sys
import base64
from struct import unpack
parser = argparse.ArgumentParser()
parser.add_argument('--port', help='ffmpeg listen port')
parser.add_argument('--outputfilename', help='ffmpeg output filename')
args = parser.parse_args()
if not args.port:
print("port is required")
sys.exit(1)
if not args.outputfilename:
print("outputfilename is required")
sys.exit(1)
if not args.outputfilename.endswith('.webm'):
print("outputfilename must end with '.webm'")
sys.exit(1)
async def _read_stream(stream, cb):
while True:
line = await stream.readline()
if line:
cb(line)
else:
break
async def _stream_subprocess(cmd, stdout_cb, stderr_cb):
global process
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.PIPE,
)
await asyncio.wait([
_read_stream(process.stdout, stdout_cb),
_read_stream(process.stderr, stderr_cb)
])
return await process.wait()
def process_stderr(line):
# ffmpeg finishes processing and writes the output file when its input is closed
# thus the completion message will come out of stderr only when the socket or stdin or whatever is closed
line = line.decode()
print(line)
if "Output" in line:
if args.outputfilename in line:
print('finished!!!!')
sys.exit(0)
def process_stdout(line):
print("STDOUT: %s" % line)
def spawn_ffmpeg(listenport, outputfilename, framerate=30, format='webm'):
outputdirectory = "sftp://username:password@10.0.0.196/var/www/static/"
input_type = "pipe:0" # stdin
params = \
f"ffmpeg " \
f"-loglevel 56 " \
f"-y " \
f"-framerate framerate " \
f"-i input_type " \
f"outputdirectoryoutputfilename "
print(params)
return params
async def socket_png_receiver(reader, writer):
while True:
# first the client sends the length of the data to us
lengthbuf = await reader.readexactly(4)
length, = unpack('!I', lengthbuf)
if length == 0:
print("length was 0, finish") # a zero length PNG says that there are no more frames
break
# then we read the PNG
print("Got PNG, length", length)
data = await reader.readexactly(length)
print(data)
png_bytes = base64.b64decode(data) # from base64 to bytes
process.stdin.write(png_bytes)
return
loop = asyncio.get_event_loop()
command = spawn_ffmpeg("24897", args.outputfilename)
ffmpeg_process = _stream_subprocess(
command.split(),
process_stdout,
process_stderr,
)
coro = asyncio.start_server(socket_png_receiver, '0.0.0.0', args.port, loop=loop)
several_futures = asyncio.gather(ffmpeg_process, coro)
server = loop.run_until_complete(several_futures)
server.close()
loop.close()
【问题讨论】:
这个问题没有具体说明提供的实现到底出了什么问题,但也许您应该尝试使用以下方式发送数据:proc.stdin.write(png_bytes)
,后跟proc.stdin.close()
?
另外,await reader.read(length)
应该是 await reader.readexactly(length)
。 StreamReader.read
的参数是最大,而不是要读取的确切字节数。
另一个可疑的事情是socket_png_receiver
似乎期望它会继续向相同的 ffmpeg 进程发送数据。这意味着 ffmpeg 必须能够在其标准输入上处理多个数据块,即它需要有一个分隔协议,就像您通过在数据之前打包长度来实现的那样。我怀疑您应该改为在 socket_png_receiver
的循环的每次迭代中启动一个新的 ffmpeg 进程。
@user4815162342 我已经更新了代码 - 以 coro 开头的行有一个错误,所以我已经更正它并将旧行注释掉了。我还注释掉了 await proc.communicate(png_bytes) ,因为这只不过是关于如何实现我的目标的巫毒猜测,只是导致了一个错误。因此,代码实际上工作正常,只是没有将数据发送到 ffmpeg 标准输入的机制,因为我根本不知道如何。
@user4815162342 你是对的 - socket_png_receiver 只会发送到一个 ffmpeg 进程。这个想法是,一旦 PNG 图像序列完成,那么 ffmpeg 和包装它的程序都会退出。
【参考方案1】:
代码有几个问题:
await reader.read(length)
应该是await reader.readexactly(length)
,因为StreamReader.read
的参数是要读取的最大字节数,它可以返回更少的字节数。
proc.communicate(png_bytes)
应更改为 proc.stdin.write(png_bytes)
。此处对communicate()
的调用不正确,因为您想继续与程序对话,而communicate()
等待所有流关闭。
asyncio.create_subprocess_exec(...)
返回的进程实例必须对socket_png_receiver
可用,例如通过使用global process
使process
变量成为全局变量。 (最好使用一个类并分配给self.process
,但这超出了这个答案的范围。)
一些潜在问题:
不需要将data
从字节解码为字符串,base64.b64decode
可以接受字节就好了。
spawn_ffmpeg()
似乎没有使用其listenport
参数。
【讨论】:
确实它工作得很好,谢谢/正是我想要的......产生了彼此和父母很好地隔离的视频处理任务。以上是关于如何将数据从 Python 异步套接字服务器发送到子进程?的主要内容,如果未能解决你的问题,请参考以下文章
将数据从套接字从C ++服务器发送到Python客户端时出现问题
如何将数据从 Android 中的数据报套接字发送到 Node js 服务器?