Python3 与 C# 并发编程之~ 进程篇

Posted 逸鹏说道

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python3 与 C# 并发编程之~ 进程篇相关的知识,希望对你有一定的参考价值。

 

上次说了很多Linux下进程相关知识,这边不再复述,下面来说说Python的并发编程,如有错误欢迎提出~

如果遇到听不懂的可以看上一次的文章:https://www.cnblogs.com/dotnetcrazy/p/9363810.html

官方文档:https://docs.python.org/3/library/concurrency.html

在线预览:http://github.lesschina.com/python/base/concurrency/2.并发编程-进程篇.html

1.进程篇

官方文档:https://docs.python.org/3/library/multiprocessing.html

Code:https://github.com/lotapp/BaseCode/tree/master/python/5.concurrent/PythonProcess

1.1.进程(Process)

Python的进程创建非常方便,看个案例:(这种方法通用,fork只适用于Linux系)

import os
# 注意一下,导入的是Process不是process(Class是大写开头)
from multiprocessing import Process

def test(name):
    print("[子进程-%s]PID:%d,PPID:%d" % (name, os.getpid(), os.getppid()))

def main():
    print("[父进程]PID:%d,PPID:%d" % (os.getpid(), os.getppid()))
    p = Process(target=test, args=("萌萌哒", )) # 单个元素的元组表达别忘了(x,)
    p.start()
    p.join()  # 父进程回收子进程资源(内部调用了wait系列方法)

if __name__ == \'__main__\':
    main()

运行结果:

[父进程]PID:25729,PPID:23434
[子进程-萌萌哒]PID:25730,PPID:25729

创建子进程时,传入一个执行函数和参数,用start()方法来启动进程即可

join()方法是父进程回收子进程的封装(主要是回收僵尸子进程(点我)

其他参数可以参考源码 or 文档,贴一下源码的init方法:

def __init__(self,group=None,target=None,name=None,args=(),kwargs={},*,daemon=None)

扩展:name:为当前进程实例的别名

  1. p.is_alive() 判断进程实例p是否还在执行
  2. p.terminate() 终止进程(发SIGTERM信号)

上面的案例如果用OOP来实现就是这样:(如果不指定方法,默认调Run方法)

import os
from multiprocessing import Process

class My_Process(Process):
    # 重写了Proce类的Init方法
    def __init__(self, name):
        self.__name = name
        Process.__init__(self)  # 调用父类方法

    # 重写了Process类的run()方法
    def run(self):
        print("[子进程-%s]PID:%d,PPID:%d" % (self.__name, os.getpid(),
                                          os.getppid()))

def main():
    print("[父进程]PID:%d,PPID:%d" % (os.getpid(), os.getppid()))
    p = My_Process("萌萌哒") # 如果不指定方法,默认调Run方法
    p.start()
    p.join()  # 父进程回收子进程资源(内部调用了wait系列方法)


if __name__ == \'__main__\':
    main()

PS:multiprocessing.Process自行处理僵死进程,不用像os.fork那样自己建立信号处理程序、安装信号处理程序


1.1.源码拓展

现在说说里面的一些门道(只想用的可以忽略)

新版本的封装可能多层,这时候可以看看Python3.3.X系列(这个算是Python3早期版本了,很多代码都暴露出来,比较明了直观)

multiprocessing.process.py

# 3.4.x开始,Process有了一个BaseProcess
# https://github.com/python/cpython/blob/3.7/Lib/multiprocessing/process.py
# https://github.com/lotapp/cpython3/tree/master/Lib/multiprocessing/process.py
def join(self, timeout=None):
    \'\'\'一直等到子进程over\'\'\'
    self._check_closed()
    # 断言(False就触发异常,提示就是后面的内容
    # 开发中用的比较多,部署的时候可以python3 -O xxx 去除所以断言
    assert self._parent_pid == os.getpid(), "只能 join 一个子进程"
    assert self._popen is not None, "只能加入一个已启动的进程"
    res = self._popen.wait(timeout) # 本质就是用了我们之前讲的wait系列
    if res is not None:
        _children.discard(self) # 销毁子进程

multiprocessing.popen_fork.py

# 3.4.x开始,在popen_fork文件中(以前是multiprocessing.forking.py)
# https://github.com/python/cpython/blob/3.7/Lib/multiprocessing/popen_fork.py
# https://github.com/lotapp/cpython3/tree/master/Lib/multiprocessing/popen_fork.py
def wait(self, timeout=None):
    if self.returncode is None:
        # 设置超时的一系列处理
        if timeout is not None:
            from multiprocessing.connection import wait
            if not wait([self.sentinel], timeout):
                return None
        # 核心操作
        return self.poll(os.WNOHANG if timeout == 0.0 else 0)
    return self.returncode

# 回顾一下上次说的:os.WNOHANG - 如果没有子进程退出,则不阻塞waitpid()调用
def poll(self, flag=os.WNOHANG):
    if self.returncode is None:
        try:
            # 他的内部调用了waitpid
            pid, sts = os.waitpid(self.pid, flag)
        except OSError as e:
            # 子进程尚未创建
            # e.errno == errno.ECHILD == 10
            return None
        if pid == self.pid:
            if os.WIFSIGNALED(sts):
                self.returncode = -os.WTERMSIG(sts)
            else:
                assert os.WIFEXITED(sts), "Status is {:n}".format(sts)
                self.returncode = os.WEXITSTATUS(sts)
    return self.returncode

关于断言的简单说明:(别泛滥)

如果条件为真,它什么都不做,反之它触发一个带可选错误信息的AssertionError

def test(a, b):
    assert b != 0, "哥哥,分母不能为0啊"
    return a / b

def main():
    test(1, 0)

if __name__ == \'__main__\':
    main()

结果:

Traceback (most recent call last):
  File "0.assert.py", line 11, in <module>
    main()
  File "0.assert.py", line 7, in main
    test(1, 0)
  File "0.assert.py", line 2, in test
    assert b != 0, "哥哥,分母不能为0啊"
AssertionError: 哥哥,分母不能为0啊

运行的时候可以指定-O参数来忽略assert,eg:

python3 -O 0.assert.py

Traceback (most recent call last):
  File "0.assert.py", line 11, in <module>
    main()
  File "0.assert.py", line 7, in main
    test(1, 0)
  File "0.assert.py", line 3, in test
    return a / b
ZeroDivisionError: division by zero

扩展:

https://docs.python.org/3/library/unittest.html

https://www.cnblogs.com/shangren/p/8038935.html


1.2.进程池

多个进程就不需要自己手动去管理了,有Pool来帮你完成,先看个案例:

import os
import time
from multiprocessing import Pool  # 首字母大写

def test(name):
    print("[子进程-%s]PID=%d,PPID=%d" % (name, os.getpid(), os.getppid()))
    time.sleep(1)

def main():
    print("[父进程]PID=%d,PPID=%d" % (os.getpid(), os.getppid()))
    p = Pool(5) # 设置最多5个进程(不设置就默认为CPU核数)
    for i in range(10):
        # 异步执行
        p.apply_async(test, args=(i, )) # 同步用apply(如非必要不建议用)
    p.close() # 关闭池,不再加入新任务
    p.join() # 等待所有子进程执行完毕回收资源(join可以指定超时时间,eg:`p.join(1)`)
    print("over")

if __name__ == \'__main__\':
    main()

图示:(join可以指定超时时间,eg:p.join(1)1.进程池

调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process(下面会说为什么)


1.3.源码拓展

验证一下Pool的默认大小是CPU的核数,看源码:

multiprocessing.pool.py

# https://github.com/python/cpython/blob/3.7/Lib/multiprocessing/pool.py
# https://github.com/lotapp/cpython3/tree/master/Lib/multiprocessing/pool.py
class Pool(object):
    def __init__(self, processes=指定的进程数,...):
        if processes is None:
            processes = os.cpu_count() or 1 # os.cpu_count() ~ CPU的核数

源码里面apply_async方法,是有回调函数(callback)的

def apply_async(self,func,args=(),kwds={},callback=None,error_callback=None):
    if self._state != RUN:
        raise ValueError("Pool not running")
    result = ApplyResult(self._cache, callback, error_callback)
    self._taskqueue.put(([(result._job, 0, func, args, kwds)], None))
    return result

来看个例子:(和JQ很像)

import os
import time
from multiprocessing import Pool  # 首字母大写

def test(name):
    print("[子进程%s]PID=%d,PPID=%d" % (name, os.getpid(), os.getppid()))
    time.sleep(1)
    return name

def error_test(name):
    print("[子进程%s]PID=%d,PPID=%d" % (name, os.getpid(), os.getppid()))
    raise Exception("[子进程%s]啊,我挂了~" % name)

def callback(result):
    """成功之后的回调函数"""
    print("[子进程%s]执行完毕" % result)  # 没有返回值就为None

def error_callback(msg):
    """错误之后的回调函数"""
    print(msg)

def main():
    print("[父进程]PID=%d,PPID=%d" % (os.getpid(), os.getppid()))
    p = Pool()  # CPU默认核数
    for i in range(5):
        # 搞2个出错的看看
        if i > 2:
            p.apply_async(
                error_test,
                args=(i, ),
                callback=callback,
                error_callback=error_callback)  # 异步执行
        else:
            # 异步执行,成功后执行callback函数(有点像jq)
            p.apply_async(test, args=(i, ), callback=callback)
    p.close()  # 关闭池,不再加入新任务
    p.join()  # 等待所有子进程执行完毕回收资源
    print("over")

if __name__ == \'__main__\':
    main()

输出:

[父进程]PID=12348,PPID=10999
[子进程0]PID=12349,PPID=12348
[子进程2]PID=12351,PPID=12348
[子进程1]PID=12350,PPID=12348
[子进程3]PID=12352,PPID=12348
[子进程4]PID=12352,PPID=12348
[子进程3]啊,我挂了~
[子进程4]啊,我挂了~
[子进程0]执行完毕
[子进程2]执行完毕
[子进程1]执行完毕
over
 

接着上面继续拓展,补充说说获取函数返回值。上面是通过成功后的回调函数来获取返回值,这次说说自带的方法:

import time
from multiprocessing import Pool, TimeoutError

def test(x):
    """开平方"""
    time.sleep(1)
    return x * x

def main():
    pool = Pool()
    task = pool.apply_async(test, (10, ))
    print(task)
    try:
        print(task.get(timeout=1))
    except TimeoutError as ex:
        print("超时了~", ex)

if __name__ == \'__main__\':
    main()

输出:(apply_async返回一个ApplyResult类,里面有个get方法可以获取返回值)

<multiprocessing.pool.ApplyResult object at 0x7fbc354f50b8>
超时了~

再举个例子,顺便把Pool里面的mapimap方法搞个案例(类比jq)

import time
from multiprocessing import Pool

def test(x):
    return x * x

if __name__ == \'__main__\':
    with Pool(processes=4) as pool:
        task = pool.apply_async(test, (10, ))
        print(task.get(timeout=1))

        obj_list = pool.map(test, range(10))
        print(obj_list)
        # 返回一个可迭代类的实例对象
        obj_iter = pool.imap(test, range(10))
        print(obj_iter)
        next(obj_iter)
        for i in obj_iter:
            print(i, end=" ")

输出:

100
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
<multiprocessing.pool.IMapIterator object at 0x7ff7f9734198>
1 4 9 16 25 36 49 64 81

微微看一眼源码:(基础忘了可以查看==> 点我 )

class IMapIterator(object):
    def __init__(self, cache):
        self._cond = threading.Condition(threading.Lock())
        self._job = next(job_counter)
        self._cache = cache
        self._items = collections.deque()
        self._index = 0
        self._length = None
        self._unsorted = {}
        cache[self._job] = self

    def __iter__(self):
        return self # 返回一个迭代器

    # 实现next方法
    def next(self, timeout=None):
        with self._cond:
            try:
                item = self._items.popleft()
            except IndexError:
                if self._index == self._length:
                    raise StopIteration from None
                self._cond.wait(timeout)
                try:
                    item = self._items.popleft()
                except IndexError:
                    if self._index == self._length:
                        raise StopIteration from None
                    raise TimeoutError from None

        success, value = item
        if success:
            return value
        raise value
......

扩展:优雅杀死子进程的探讨 https://segmentfault.com/q/1010000005077517


1.4.拓展之subprocess

官方文档:https://docs.python.org/3/library/subprocess.html

还记得之前李代桃僵的execlxxx系列吗?

这不,subprocess就是它的一层封装,当然了要强大的多,先看个例子:(以os.execlp的例子为引)

import subprocess

def main():
    # os.execlp("ls", "ls", "-al")  # 执行Path环境变量可以搜索到的命令
    result = subprocess.run(["ls", "-al"])
    print(result)

if __name__ == \'__main__\':
    main()

输出

总用量 44
drwxrwxr-x 2 dnt dnt 4096 8月   7 17:32 .
drwxrwxr-x 4 dnt dnt 4096 8月   6 08:01 ..
-rw-rw-r-- 1 dnt dnt  151 8月   3 10:49 0.assert.py
-rw-rw-r-- 1 dnt dnt  723 8月   5 18:00 1.process2.py
-rw-rw-r-- 1 dnt dnt  501 8月   3 10:20 1.process.py
-rw-rw-r-- 1 dnt dnt 1286 8月   6 08:16 2.pool1.py
-rw-rw-r-- 1 dnt dnt  340 8月   7 16:38 2.pool2.py
-rw-rw-r-- 1 dnt dnt  481 8月   7 16:50 2.pool3.py
-rw-rw-r-- 1 dnt dnt  652 8月   5 17:01 2.pool.py
-rw-rw-r-- 1 dnt dnt  191 8月   7 17:33 3.subprocess.py
CompletedProcess(args=[\'ls\', \'-al\'], returncode=0)

文档

现在看下官方的文档描述来理解一下:

r"""
具有可访问I / O流的子进程
Subprocesses with accessible I/O streams

此模块允许您生成进程,连接到它们输入/输出/错误管道,并获取其返回代码。
This module allows you to spawn processes, connect to their
input/output/error pipes, and obtain their return codes.

完整文档可以查看:https://docs.python.org/3/library/subprocess.html
For a complete description of this module see the Python documentation.

Main API
========
run(...): 运行命令,等待它完成,然后返回`CompletedProcess`实例。
Runs a command, waits for it to complete, 
then returns a CompletedProcess instance.

Popen(...): 用于在新进程中灵活执行命令的类
A class for flexibly executing a command in a new process

Constants(常量)
---------
DEVNULL: 特殊值,表示应该使用`os.devnull`
Special value that indicates that os.devnull should be used

PIPE:    表示应创建`PIPE`管道的特殊值
Special value that indicates a pipe should be created

STDOUT:  特殊值,表示`stderr`应该转到`stdout`
Special value that indicates that stderr should go to stdout

Older API(尽量不用,说不定以后就淘汰了)
=========
call(...): 运行命令,等待它完成,然后返回返回码。
Runs a command, waits for it to complete, then returns the return code.

check_call(...): Same as call() but raises CalledProcessError()
    if return code is not 0(返回值不是0就引发异常)

check_output(...): 与check_call()相同,但返回`stdout`的内容,而不是返回代码
Same as check_call but returns the contents of stdout instead of a return code

getoutput(...): 在shell中运行命令,等待它完成,然后返回输出
Runs a command in the shell, waits for it to complete,then returns the output

getstatusoutput(...): 在shell中运行命令,等待它完成,然后返回一个(exitcode,output)元组
Runs a command in the shell, waits for it to complete,
then returns a (exitcode, output) tuple
"""

其实看看源码很有意思:(内部其实就是调用的os.popen【进程先导篇讲进程守护的时候用过】)

def run(*popenargs, input=None, capture_output=False,
        timeout=None, check=False, **kwargs):

    if input is not None:
        if \'stdin\' in kwargs:
            raise ValueError(\'stdin和输入参数可能都不会被使用。\')
        kwargs[\'stdin\'] = PIPE

    if capture_output:
        if (\'stdout\' in kwargs) or (\'stderr\' in kwargs):
            raise ValueError(\'不能和capture_outpu一起使用stdout 或 stderr\')
        kwargs[\'stdout\'] = PIPE
        kwargs[\'stderr\'] = PIPE

    with Popen(*popenargs, **kwargs) as process:
        try:
            stdout, stderr = process.communicate(input, timeout=timeout)
        except TimeoutExpired:
            process.kill()
            stdout, stderr = process.communicate()
            raise TimeoutExpired(
                process.args, timeout, output=stdout, stderr=stderr)
        except:  # 包括KeyboardInterrupt的通信处理。
            process.kill()
            # 不用使用process.wait(),.__ exit__为我们做了这件事。
            raise
        retcode = process.poll()
        if check and retcode:
            raise CalledProcessError(
                retcode, process.args, output=stdout, stderr=stderr)
    return CompletedProcess(process.args, retcode, stdout, stderr)

返回值类型:CompletedProcess

# https://github.com/lotapp/cpython3/blob/master/Lib/subprocess.py
class CompletedProcess(object):
    def __init__(self, args, returncode, stdout=None, stderr=None):
        self.args = args
        self.returncode = returncode
        self.stdout = stdout
        self.stderr = stderr

    def __repr__(self):
    """对象按指定的格式显示"""
        args = [
            \'args={!r}\'.format(self.args),
            \'returncode={!r}\'.format(self.returncode)
        ]
        if self.stdout is not None:
            args.append(\'stdout={!r}\'.format(self.stdout))
        if self.stderr is not None:
            args.append(\'stderr={!r}\'.format(self.stderr))
        return "{}({})".format(type(self).__name__, \', \'.join(args))

    def check_returncode(self):
        """如果退出代码非零,则引发CalledProcessError"""
        if self.returncode:
            raise CalledProcessError(self.returncode, self.args, self.stdout,
                                     self.stderr)

简单demo

再来个案例体会一下方便之处:

import subprocess

def main():
    result = subprocess.run(["ping", "www.baidu.com"])
    print(result.stdout)

if __name__ == \'__main__\':
    main()

图示: 2.subprocess.gif

交互demo

再来个强大的案例(交互的程序都可以,比如 ftpnslookup 等等):popen1.communicate

import subprocess

def main():
    process = subprocess.Popen(
        ["ipython3"],
        stdin=subprocess.PIPE,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE)
    try:
        # 对pstree进行交互
        out, err = process.communicate(input=b\'print("hello")\', timeout=3)
        print("Out:%s\\nErr:%s" % (out.decode(), err.decode()))
    except TimeoutError:
        # 如果超时到期,则子进程不会被终止,需要自己处理一下
        process.kill()
        out, err = process.communicate()
        print("Out:%s\\nErr:%s" % (out.decode(), err.decode()))

if __name__ == \'__main__\':
    main()

输出:

IPython 6.4.0 -- An enhanced Interactive Python. Type \'?\' for help.

In [1]: hello

In [2]: Do you really want to exit ([y]/n)?

Err:

注意点:如果超时到期,则子进程不会被终止,需要自己处理一下(官方提醒)

通信demo

这个等会说进程间通信还会说,所以简单举个例子,老规矩拿ps aux | grep bash说事:

import subprocess


def main():
    # ps aux | grep bash
    # 进程1获取结果
    p1 = subprocess.Popen(["ps", "-aux"], stdout=subprocess.PIPE)
    # 得到进程1的结果再进行筛选
    p2 = subprocess.Popen(["grep", "bash"], stdin=p1.stdo

以上是关于Python3 与 C# 并发编程之~ 进程篇的主要内容,如果未能解决你的问题,请参考以下文章

Python3 与 C# 并发编程之~ 上篇

Python3 加C# 并发编程!强强组合!会产生什么样的化学反应?

Python3 异步编程之进程与线程-1

Python3 并发编程之多进程

并发编程之多线程基础篇及面试

第36天并发编程之进程篇