如何使用 Asyncio 在 3 个子进程(使用管道)之间流式处理数据并使用结果数据

Posted

技术标签:

【中文标题】如何使用 Asyncio 在 3 个子进程(使用管道)之间流式处理数据并使用结果数据【英文标题】:How to use Asyncio to stream process data between 3 subprocesses (using pipes) and consume the resulting data 【发布时间】:2021-07-02 10:45:30 【问题描述】:

我有 3 个脚本需要组合才能处理管道中的数据。脚本永远运行,直到执行被用户中断。这是它们在终端中的执行方式:

script1_producer.sh | script2_processor.sh | script3_processor.sh

script1_producer.sh 生成要处理的数据(例如,它只打印递增的数字)

i=1
while true; do
  echo $i
  i=$(($i+1))
  sleep 1
done

script2_processor.sh 使用来自 Script1 的数据并计算新的数据流(每个数字相乘*2):

while read -r line
do
  echo "$(($line*2))"
done < "$1:-/dev/stdin"

script3_processor.sh 使用来自 Script2 的数据并计算一个新的数据流(给每个数字添加一个字母):

while read -r line
do
  echo "A$(($line))"
done < "$1:-/dev/stdin"

运行script1_producer.sh | script2_processor.sh | script3_processor.sh时的结果输出:

A2
A4
A6
...

现在我希望这些脚本由 Python 子进程使用管道控制。 最后,我需要处理script3_processor.sh 的输出并对每一行执行操作。 我正在尝试使用 asyncio 来实现这一点,但如果可能的话,不使用 asyncio 是可以的。

这是我的 - 非常天真的尝试process_pipes.py

import asyncio
import subprocess
import os


async def async_receive():
    p1 = await asyncio.create_subprocess_exec(
        "./script1_producer.sh",
        stdout=subprocess.PIPE,
    )

    p2 = await asyncio.create_subprocess_exec(
        "./script2_processor.sh",
        stdin=p1.stdout,
        stdout=subprocess.PIPE,
    )

    p3 = await asyncio.create_subprocess_exec(
        "./script3_processor.sh",
        stdin=p2.stdout,
        stdout=subprocess.PIPE,
    )

    # Read just one line to test
    data = await p3.stdout.readline()
    print(data)


asyncio.run(async_receive())

不幸的是,执行此脚本时出现以下异常:

Traceback (most recent call last):
  File "process_pipes.py", line 28, in <module>
    asyncio.run(async_receive())
  File "/usr/lib/python3.8/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
  File "process_pipes.py", line 12, in async_receive
    p2 = await asyncio.create_subprocess_exec(
  File "/usr/lib/python3.8/asyncio/subprocess.py", line 236, in create_subprocess_exec
    transport, protocol = await loop.subprocess_exec(
  File "/usr/lib/python3.8/asyncio/base_events.py", line 1630, in subprocess_exec
    transport = await self._make_subprocess_transport(
  File "/usr/lib/python3.8/asyncio/unix_events.py", line 197, in _make_subprocess_transport
    transp = _UnixSubprocessTransport(self, protocol, args, shell,
  File "/usr/lib/python3.8/asyncio/base_subprocess.py", line 36, in __init__
    self._start(args=args, shell=shell, stdin=stdin, stdout=stdout,
  File "/usr/lib/python3.8/asyncio/unix_events.py", line 789, in _start
    self._proc = subprocess.Popen(
  File "/usr/lib/python3.8/subprocess.py", line 808, in __init__
    errread, errwrite) = self._get_handles(stdin, stdout, stderr)
  File "/usr/lib/python3.8/subprocess.py", line 1477, in _get_handles
    p2cread = stdin.fileno()
AttributeError: 'StreamReader' object has no attribute 'fileno'

我在 *** 和其他地方阅读了一些示例,告诉我以不同的方式处理管道,但无法让这些在我的场景中发挥作用。

如何模拟运行 script1_producer.sh | script2_processor.sh | script3_processor.sh 并在 Python 中处理 script3 的输出?

【问题讨论】:

【参考方案1】:

我找到了另一个解决方案,通过这个问题指导我:

    Connect two processes started with asyncio.subprocess.create_subprocess_exec()

在此之前,需要说明的一件事是脚本存在语法错误,因为在像 echo "$(($line*2))" 这样的行中,应该有更多的空格,比如 echo "$(( $line * 2 ))",bash 对空格有点傻.除此之外,一切都很好。

这里要记住一点,管道有两端,一端用于读取,另一端用于写入。所以在第一个过程中,它会像这个草图:

写完(WE) 读取结束(RE)
p0 ---> | pipe 1 | ---> p1
       WE        RE

您应该使用来自os 的管道,如上述问题中所述。这部分是这样的:

    read1, write1 = os.pipe()
    p0 = await asyncio.create_subprocess_exec(
        "./script1_producer.sh",
        stdout=write1
    )

stdout 将是管道的 WE,而对于 p1 我们有

| pipe 1 | ---> p1 -------> | pipe 2|
WE       RE=stdin  stdout=WE   

标准输入是第一个管道的 RE,标准输出是第二个管道的 WE,如下所示:

    read2, write2 = os.pipe()
    p2 = await asyncio.create_subprocess_exec(
        "./script2_processor.sh",
        stdin=read1,
        stdout=write2,
    )

在第三个过程中

| pipe 2 | ---> p3 -------> | asyncio PIPE|
WE       RE=stdin  stdout=WE   

我们一起合作

import asyncio
import subprocess
import os


async def async_receive():
    read1, write1 = os.pipe()
    p0 = await asyncio.create_subprocess_exec(
        "./script1_producer.sh",
        stdout=write1
    )

    read2, write2 = os.pipe()
    p2 = await asyncio.create_subprocess_exec(
        "./script2_processor.sh",
        stdin=read1,
        stdout=write2,
    )

    p3 = await asyncio.create_subprocess_exec(
        "./script3_processor.sh",
        stdin=read2,
        stdout=asyncio.subprocess.PIPE,
    )

    # Read just one line to test
    while True:
        data = await p3.stdout.readline()
        data = data.decode('ascii').rstrip()
        print(data)
        print("Sleeping 1 sec...")
        await asyncio.sleep(1)


asyncio.run(async_receive())

这样你仍然可以使用 asyncio。

【讨论】:

【参考方案2】:

这是在没有 asyncio 的情况下解决问题的方法 - 只需使用带有 shell=True 的 Popen 并将管道放入命令中:

import subprocess
import os


def receive():
    p = subprocess.Popen(
        "./script1_producer.sh "
        "| ./script2_processor.sh "
        "| ./script3_processor.sh",
        stdout=subprocess.PIPE, shell=True)

    while True:
        line = p.stdout.readline()
        if line:
            print(line.decode().strip())

if __name__ == '__main__':
    receive()

【讨论】:

以上是关于如何使用 Asyncio 在 3 个子进程(使用管道)之间流式处理数据并使用结果数据的主要内容,如果未能解决你的问题,请参考以下文章

如何在asyncio python中使用子进程模块限制并发进程数

如何使用 asyncio 复杂地管理 shell 进程?

如何在 asyncio python 中使用 subprocess 模块限制并发进程的数量

如何使用 asyncio 从使用 SubprocessProtocol 的子进程中读取并在任意点终止该子进程?

如何通过python3 asyncio reuse_port编写正确的多进程服务器程序?

python 多进程和多线程3 —— asyncio - 异步IO